This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new b7ff890 Camel-Kafka: Fixed CS b7ff890 is described below commit b7ff890a4fb07b1c1ffd9fd299caa16048c9f88f Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Mon Mar 25 08:16:57 2019 +0100 Camel-Kafka: Fixed CS --- .../camel/component/kafka/KafkaConsumer.java | 97 +++++++++++++--------- .../KafkaConsumerRebalancePartitionRevokeTest.java | 8 +- .../kafka/KafkaConsumerRebalanceTest.java | 6 +- 3 files changed, 66 insertions(+), 45 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 9417dc7..e644a10 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -106,14 +106,13 @@ public class KafkaConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { - log.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", - endpoint.getConfiguration().getTopic(), endpoint.getConfiguration().isBreakOnFirstError()); + log.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", endpoint.getConfiguration().getTopic(), endpoint.getConfiguration().isBreakOnFirstError()); super.doStart(); // is the offset repository already started? StateRepository repo = endpoint.getConfiguration().getOffsetRepository(); if (repo instanceof ServiceSupport) { - boolean started = ((ServiceSupport) repo).isStarted(); + boolean started = ((ServiceSupport)repo).isStarted(); // if not already started then we would do that and also stop it if (!started) { stopOffsetRepo = true; @@ -132,7 +131,8 @@ public class KafkaConsumer extends DefaultConsumer { for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) { KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + "", getProps()); - // pre-initialize task during startup so if there is any error we have it thrown asap + // pre-initialize task during startup so if there is any error we + // have it thrown asap task.preInit(); executor.submit(task); tasks.add(task); @@ -190,7 +190,8 @@ public class KafkaConsumer extends DefaultConsumer { while (reConnect) { try { if (!first) { - // re-initialize on re-connect so we have a fresh consumer + // re-initialize on re-connect so we have a fresh + // consumer doInit(); } } catch (Throwable e) { @@ -211,7 +212,8 @@ public class KafkaConsumer extends DefaultConsumer { first = false; - // doRun keeps running until we either shutdown or is told to re-connect + // doRun keeps running until we either shutdown or is told to + // re-connect reConnect = doRun(); } } @@ -224,9 +226,11 @@ public class KafkaConsumer extends DefaultConsumer { // create consumer ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); try { - // Kafka uses reflection for loading authentication settings, use its classloader + // Kafka uses reflection for loading authentication settings, + // use its classloader Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); - // this may throw an exception if something is wrong with kafka consumer + // this may throw an exception if something is wrong with kafka + // consumer this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); } finally { Thread.currentThread().setContextClassLoader(threadClassLoader); @@ -235,7 +239,8 @@ public class KafkaConsumer extends DefaultConsumer { @SuppressWarnings("unchecked") protected boolean doRun() { - // allow to re-connect thread in case we use that to retry failed messages + // allow to re-connect thread in case we use that to retry failed + // messages boolean reConnect = false; boolean unsubscribing = false; @@ -250,19 +255,23 @@ public class KafkaConsumer extends DefaultConsumer { StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository(); if (offsetRepository != null) { - // This poll to ensures we have an assigned partition otherwise seek won't work + // This poll to ensures we have an assigned partition + // otherwise seek won't work ConsumerRecords poll = consumer.poll(100); - for (TopicPartition topicPartition : (Set<TopicPartition>) consumer.assignment()) { + for (TopicPartition topicPartition : (Set<TopicPartition>)consumer.assignment()) { String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition)); if (offsetState != null && !offsetState.isEmpty()) { - // The state contains the last read offset so you need to seek from the next one + // The state contains the last read offset so you + // need to seek from the next one long offset = deserializeOffsetValue(offsetState) + 1; log.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset); consumer.seek(topicPartition, offset); } else { - // If the init poll has returned some data of a currently unknown topic/partition in the state - // then resume from their offset in order to avoid losing data + // If the init poll has returned some data of a + // currently unknown topic/partition in the state + // then resume from their offset in order to avoid + // losing data List<ConsumerRecord<Object, Object>> partitionRecords = poll.records(topicPartition); if (!partitionRecords.isEmpty()) { long offset = partitionRecords.get(0).offset(); @@ -274,12 +283,14 @@ public class KafkaConsumer extends DefaultConsumer { } else if (endpoint.getConfiguration().getSeekTo() != null) { if (endpoint.getConfiguration().getSeekTo().equals("beginning")) { log.debug("{} is seeking to the beginning on topic {}", threadId, topicName); - // This poll to ensures we have an assigned partition otherwise seek won't work + // This poll to ensures we have an assigned partition + // otherwise seek won't work consumer.poll(100); consumer.seekToBeginning(consumer.assignment()); } else if (endpoint.getConfiguration().getSeekTo().equals("end")) { log.debug("{} is seeking to the end on topic {}", threadId, topicName); - // This poll to ensures we have an assigned partition otherwise seek won't work + // This poll to ensures we have an assigned partition + // otherwise seek won't work consumer.poll(100); consumer.seekToEnd(consumer.assignment()); } @@ -304,21 +315,23 @@ public class KafkaConsumer extends DefaultConsumer { while (!breakOnErrorHit && recordIterator.hasNext()) { record = recordIterator.next(); if (log.isTraceEnabled()) { - log.trace("Partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), - record.value()); + log.trace("Partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value()); } Exchange exchange = endpoint.createKafkaExchange(record); propagateHeaders(record, exchange, endpoint.getConfiguration()); - // if not auto commit then we have additional information on the exchange + // if not auto commit then we have additional + // information on the exchange if (!isAutoCommitEnabled()) { exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordIterator.hasNext()); } if (endpoint.getConfiguration().isAllowManualCommit()) { - // allow Camel users to access the Kafka consumer API to be able to do for example manual commits + // allow Camel users to access the Kafka + // consumer API to be able to do for example + // manual commits KafkaManualCommit manual = endpoint.getComponent().getKafkaManualCommitFactory().newInstance(exchange, consumer, topicName, threadId, - offsetRepository, partition, record.offset()); + offsetRepository, partition, record.offset()); exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual); } @@ -329,29 +342,36 @@ public class KafkaConsumer extends DefaultConsumer { } if (exchange.getException() != null) { - // processing failed due to an unhandled exception, what should we do + // processing failed due to an unhandled + // exception, what should we do if (endpoint.getConfiguration().isBreakOnFirstError()) { - // we are failing and we should break out - log.warn("Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", - exchange, topicName, partitionLastOffset); - // force commit so we resume on next poll where we failed + // we are failing and we should break + // out + log.warn("Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", exchange, + topicName, partitionLastOffset); + // force commit so we resume on next + // poll where we failed commitOffset(offsetRepository, partition, partitionLastOffset, true); // continue to next partition breakOnErrorHit = true; } else { - // will handle/log the exception and then continue to next + // will handle/log the exception and + // then continue to next getExceptionHandler().handleException("Error during processing", exchange, exchange.getException()); } } else { // record was success so remember its offset partitionLastOffset = record.offset(); - //lastOffsetProcessed would be used by Consumer re-balance listener to preserve offset state upon partition revoke + // lastOffsetProcessed would be used by + // Consumer re-balance listener to preserve + // offset state upon partition revoke lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset); } } if (!breakOnErrorHit) { - // all records processed from partition so commit them + // all records processed from partition so + // commit them commitOffset(offsetRepository, partition, partitionLastOffset, false); } } @@ -385,7 +405,8 @@ public class KafkaConsumer extends DefaultConsumer { consumer.unsubscribe(); Thread.currentThread().interrupt(); } catch (KafkaException e) { - // some kind of error in kafka, it may happen during unsubscribing or during normal processing + // some kind of error in kafka, it may happen during + // unsubscribing or during normal processing if (unsubscribing) { getExceptionHandler().handleException("Error unsubscribing " + threadId + " from kafka topic " + topicName, e); } else { @@ -415,7 +436,8 @@ public class KafkaConsumer extends DefaultConsumer { } private void shutdown() { - // As advised in the KAFKA-1894 ticket, calling this wakeup method breaks the infinite loop + // As advised in the KAFKA-1894 ticket, calling this wakeup method + // breaks the infinite loop consumer.wakeup(); } @@ -424,11 +446,11 @@ public class KafkaConsumer extends DefaultConsumer { log.debug("onPartitionsRevoked: {} from topic {}", threadId, topicName); StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository(); - for (TopicPartition partition : partitions) { + for (TopicPartition partition : partitions) { String offsetKey = serializeOffsetKey(partition); Long offset = lastProcessedOffset.get(offsetKey); if (offset == null) { - offset = -1l; + offset = -1L; } log.debug("Saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, offset); commitOffset(offsetRepository, partition, offset, true); @@ -445,7 +467,8 @@ public class KafkaConsumer extends DefaultConsumer { for (TopicPartition partition : partitions) { String offsetState = offsetRepository.getState(serializeOffsetKey(partition)); if (offsetState != null && !offsetState.isEmpty()) { - // The state contains the last read offset so you need to seek from the next one + // The state contains the last read offset so you need + // to seek from the next one long offset = deserializeOffsetValue(offsetState) + 1; log.debug("Resuming partition {} from offset {} from state", partition.partition(), offset); consumer.seek(partition, offset); @@ -458,9 +481,8 @@ public class KafkaConsumer extends DefaultConsumer { private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange, KafkaConfiguration kafkaConfiguration) { HeaderFilterStrategy headerFilterStrategy = kafkaConfiguration.getHeaderFilterStrategy(); KafkaHeaderDeserializer headerDeserializer = kafkaConfiguration.getKafkaHeaderDeserializer(); - StreamSupport.stream(record.headers().spliterator(), false) - .filter(header -> shouldBeFiltered(header, exchange, headerFilterStrategy)) - .forEach(header -> exchange.getIn().setHeader(header.key(), headerDeserializer.deserialize(header.key(), header.value()))); + StreamSupport.stream(record.headers().spliterator(), false).filter(header -> shouldBeFiltered(header, exchange, headerFilterStrategy)) + .forEach(header -> exchange.getIn().setHeader(header.key(), headerDeserializer.deserialize(header.key(), header.value()))); } private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) { @@ -483,4 +505,3 @@ public class KafkaConsumer extends DefaultConsumer { return Long.parseLong(offset); } } - diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java index 385e658..4556f13 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java @@ -63,8 +63,8 @@ public class KafkaConsumerRebalancePartitionRevokeTest extends BaseEmbeddedKafka @Test public void ensurePartitionRevokeCallsWithLastProcessedOffset() throws Exception { boolean partitionRevokeCalled = messagesLatch.await(30000, TimeUnit.MILLISECONDS); - assertTrue("StateRepository.setState should have been called with offset >= 0 for topic" + TOPIC + - ". Remaining count : " + messagesLatch.getCount(), partitionRevokeCalled); + assertTrue("StateRepository.setState should have been called with offset >= 0 for topic" + TOPIC + + ". Remaining count : " + messagesLatch.getCount(), partitionRevokeCalled); } @Override @@ -92,7 +92,7 @@ public class KafkaConsumerRebalancePartitionRevokeTest extends BaseEmbeddedKafka } public class OffsetStateRepository extends MemoryStateRepository { - CountDownLatch messagesLatch = null; + CountDownLatch messagesLatch; public OffsetStateRepository(CountDownLatch messagesLatch) { this.messagesLatch = messagesLatch; @@ -114,7 +114,7 @@ public class KafkaConsumerRebalancePartitionRevokeTest extends BaseEmbeddedKafka @Override public void setState(String key, String value) { if (key.contains(TOPIC) && messagesLatch.getCount() > 0 - && Long.parseLong(value) >= 0) { + && Long.parseLong(value) >= 0) { messagesLatch.countDown(); } super.setState(key, value); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java index 7deef47..077d689 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java @@ -44,8 +44,8 @@ public class KafkaConsumerRebalanceTest extends BaseEmbeddedKafkaTest { @Test public void offsetGetStateMustHaveBeenCalledTwice() throws Exception { boolean offsetGetStateCalled = messagesLatch.await(30000, TimeUnit.MILLISECONDS); - assertTrue("StateRepository.getState should have been called twice for topic " + TOPIC + - ". Remaining count : " + messagesLatch.getCount(), offsetGetStateCalled); + assertTrue("StateRepository.getState should have been called twice for topic " + TOPIC + + ". Remaining count : " + messagesLatch.getCount(), offsetGetStateCalled); } @Override @@ -73,7 +73,7 @@ public class KafkaConsumerRebalanceTest extends BaseEmbeddedKafkaTest { } public class OffsetStateRepository implements StateRepository<String, String> { - CountDownLatch messagesLatch = null; + CountDownLatch messagesLatch; public OffsetStateRepository(CountDownLatch messagesLatch) { this.messagesLatch = messagesLatch;