This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 10de4bab2e76046b9535128bf38bf67b80c3f3f4 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Sep 20 12:38:40 2019 +0200 Camel-Pulsar: Fixed CS --- .../camel/component/pulsar/PulsarComponent.java | 12 ++- .../camel/component/pulsar/PulsarEndpoint.java | 14 ++-- .../component/pulsar/PulsarMessageListener.java | 6 +- .../component/pulsar/PulsarMessageReceipt.java | 17 ++-- .../pulsar/PulsarMessageReceiptFactory.java | 9 ++- .../camel/component/pulsar/PulsarProducer.java | 24 ++---- .../pulsar/configuration/PulsarConfiguration.java | 92 +++++++++++----------- .../camel/component/pulsar/utils/PulsarPath.java | 2 +- .../consumers/CommonCreationStrategyImpl.java | 5 +- .../pulsar/utils/message/PulsarMessageUtils.java | 2 +- .../pulsar/PulsarConcurrentConsumerInTest.java | 4 +- .../pulsar/PulsarConcurrentProducerInTest.java | 4 +- .../pulsar/PulsarConsumerAcknowledgementTest.java | 31 ++------ .../component/pulsar/PulsarConsumerInTest.java | 19 ++--- .../PulsarConsumerNoAcknowledgementTest.java | 22 ++---- .../pulsar/PulsarCustomMessageReceiptTest.java | 27 ++----- .../component/pulsar/PulsarProducerInTest.java | 16 +--- .../camel/component/pulsar/PulsarTestSupport.java | 4 +- .../component/pulsar/utils/PulsarUtilsTest.java | 4 +- 19 files changed, 134 insertions(+), 180 deletions(-) diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java index b43ab1d..08e6f65 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java @@ -108,9 +108,12 @@ public class PulsarComponent extends DefaultComponent { /** * Whether to allow manual message acknowledgements. * <p/> - * If this option is enabled, then messages are not immediately acknowledged after being consumed. - * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}. - * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs. + * If this option is enabled, then messages are not immediately acknowledged + * after being consumed. Instead, an instance of + * {@link PulsarMessageReceipt} is stored as a header on the + * {@link org.apache.camel.Exchange}. Messages can then be acknowledged + * using {@link PulsarMessageReceipt} at any time before the ackTimeout + * occurs. */ public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) { this.allowManualAcknowledgement = allowManualAcknowledgement; @@ -121,7 +124,8 @@ public class PulsarComponent extends DefaultComponent { } /** - * Provide a factory to create an alternate implementation of {@link PulsarMessageReceipt}. + * Provide a factory to create an alternate implementation of + * {@link PulsarMessageReceipt}. */ public void setPulsarMessageReceiptFactory(PulsarMessageReceiptFactory pulsarMessageReceiptFactory) { this.pulsarMessageReceiptFactory = pulsarMessageReceiptFactory; diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java index 0330677..5ab8bbe 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java @@ -34,13 +34,17 @@ public class PulsarEndpoint extends DefaultEndpoint { private PulsarClient pulsarClient; private String uri; - @UriPath(enums = "persistent,non-persistent") @Metadata(required = true) + @UriPath(enums = "persistent,non-persistent") + @Metadata(required = true) private String persistence; - @UriPath @Metadata(required = true) + @UriPath + @Metadata(required = true) private String tenant; - @UriPath @Metadata(required = true) + @UriPath + @Metadata(required = true) private String namespace; - @UriPath @Metadata(required = true) + @UriPath + @Metadata(required = true) private String topic; @UriParam private PulsarConfiguration pulsarConfiguration; @@ -140,6 +144,6 @@ public class PulsarEndpoint extends DefaultEndpoint { @Override public PulsarComponent getComponent() { - return (PulsarComponent) super.getComponent(); + return (PulsarComponent)super.getComponent(); } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java index 55249fc..11e87fb 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java @@ -47,8 +47,7 @@ public class PulsarMessageListener implements MessageListener<byte[]> { try { if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) { - exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, - endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer)); + exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer)); processor.process(exchange); } else { processor.process(exchange); @@ -60,8 +59,7 @@ public class PulsarMessageListener implements MessageListener<byte[]> { } private void handleProcessorException(final Exchange exchange, final Exception exception) { - final Exchange exchangeWithException = PulsarMessageUtils - .updateExchangeWithException(exception, exchange); + final Exchange exchangeWithException = PulsarMessageUtils.updateExchangeWithException(exception, exchange); exceptionHandler.handleException("An error occurred", exchangeWithException, exception); } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java index 7cdfd90..bb2f849 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java @@ -25,8 +25,10 @@ import org.apache.pulsar.client.api.PulsarClientException; /** * Acknowledge the receipt of a message using the Pulsar consumer. * <p> - * Available on the {@link Exchange} if {@link PulsarConfiguration#isAllowManualAcknowledgement()} is true. - * An alternative to the default may be provided by implementing {@link PulsarMessageReceiptFactory}. + * Available on the {@link Exchange} if + * {@link PulsarConfiguration#isAllowManualAcknowledgement()} is true. An + * alternative to the default may be provided by implementing + * {@link PulsarMessageReceiptFactory}. */ public interface PulsarMessageReceipt { @@ -38,7 +40,8 @@ public interface PulsarMessageReceipt { void acknowledge() throws PulsarClientException; /** - * Acknowledge receipt of all of the messages in the stream up to and including this message synchronously. + * Acknowledge receipt of all of the messages in the stream up to and + * including this message synchronously. * * @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulative(MessageId) */ @@ -52,7 +55,8 @@ public interface PulsarMessageReceipt { CompletableFuture<Void> acknowledgeAsync(); /** - * Acknowledge receipt of all of the messages in the stream up to and including this message asynchronously. + * Acknowledge receipt of all of the messages in the stream up to and + * including this message asynchronously. * * @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulativeAsync(MessageId) */ @@ -62,9 +66,10 @@ public interface PulsarMessageReceipt { * Acknowledge the failure to process this message. * * @see org.apache.pulsar.client.api.Consumer#negativeAcknowledge(MessageId) - * Note: Available in Puslar 2.4.0. Implementations with earlier versions should return an {@link java.lang.UnsupportedOperationException}. + * Note: Available in Puslar 2.4.0. Implementations with earlier + * versions should return an + * {@link java.lang.UnsupportedOperationException}. */ void negativeAcknowledge(); } - diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java index 314ae95..0a09723 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java @@ -21,10 +21,13 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; /** - * Factory to create a new {@link PulsarMessageReceipt} to store on the {@link Exchange}. + * Factory to create a new {@link PulsarMessageReceipt} to store on the + * {@link Exchange}. * <p> - * Implement this interface if an alternate implementation of {@link PulsarMessageReceipt} is required - * as newer Pulsar clients may have acknowledgement functionality not yet supported by {@link DefaultPulsarMessageReceipt}. + * Implement this interface if an alternate implementation of + * {@link PulsarMessageReceipt} is required as newer Pulsar clients may have + * acknowledgement functionality not yet supported by + * {@link DefaultPulsarMessageReceipt}. */ public interface PulsarMessageReceiptFactory { diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java index d7c8b0a..5503949 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java @@ -44,8 +44,7 @@ public class PulsarProducer extends DefaultProducer { final Message message = exchange.getIn(); byte[] body; try { - body = exchange.getContext().getTypeConverter() - .mandatoryConvertTo(byte[].class, exchange, message.getBody()); + body = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, message.getBody()); } catch (NoTypeConversionAvailableException | TypeConversionException exception) { // fallback to try serialize the data body = PulsarMessageUtils.serialize(message.getBody()); @@ -61,24 +60,15 @@ public class PulsarProducer extends DefaultProducer { if (producerName == null) { producerName = topicUri + "-" + Thread.currentThread().getId(); } - final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint - .getPulsarClient() - .newProducer() - .producerName(producerName) - .topic(topicUri) - .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS) - .blockIfQueueFull(configuration.isBlockIfQueueFull()) - .maxPendingMessages(configuration.getMaxPendingMessages()) - .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions()) - .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS) - .batchingMaxMessages(configuration.getMaxPendingMessages()) - .enableBatching(configuration.isBatchingEnabled()) - .initialSequenceId(configuration.getInitialSequenceId()) - .compressionType(configuration.getCompressionType()); + final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().producerName(producerName).topic(topicUri) + .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS).blockIfQueueFull(configuration.isBlockIfQueueFull()) + .maxPendingMessages(configuration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions()) + .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS).batchingMaxMessages(configuration.getMaxPendingMessages()) + .enableBatching(configuration.isBatchingEnabled()).initialSequenceId(configuration.getInitialSequenceId()).compressionType(configuration.getCompressionType()); if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) { producerBuilder.messageRouter(configuration.getMessageRouter()); } else { - producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode()); + producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode()); } producer = producerBuilder.create(); } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java index ea34312..b60bfed 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java @@ -55,11 +55,10 @@ public class PulsarConfiguration { private int sendTimeoutMs = 30000; @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false") private boolean blockIfQueueFull; - @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", - defaultValue = "1000") + @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000") private int maxPendingMessages = 1000; @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if " - + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000") + + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000") private int maxPendingMessagesAcrossPartitions = 50000; @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000") private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1); @@ -92,7 +91,8 @@ public class PulsarConfiguration { } /** - * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to EXCLUSIVE + * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to + * EXCLUSIVE */ public void setSubscriptionType(SubscriptionType subscriptionType) { this.subscriptionType = subscriptionType; @@ -147,7 +147,8 @@ public class PulsarConfiguration { } /** - * Prefix to add to consumer names when a SHARED or FAILOVER subscription is used + * Prefix to add to consumer names when a SHARED or FAILOVER subscription is + * used */ public void setConsumerNamePrefix(String consumerNamePrefix) { this.consumerNamePrefix = consumerNamePrefix; @@ -160,9 +161,12 @@ public class PulsarConfiguration { /** * Whether to allow manual message acknowledgements. * <p/> - * If this option is enabled, then messages are not immediately acknowledged after being consumed. - * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}. - * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs. + * If this option is enabled, then messages are not immediately acknowledged + * after being consumed. Instead, an instance of + * {@link PulsarMessageReceipt} is stored as a header on the + * {@link org.apache.camel.Exchange}. Messages can then be acknowledged + * using {@link PulsarMessageReceipt} at any time before the ackTimeout + * occurs. */ public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) { this.allowManualAcknowledgement = allowManualAcknowledgement; @@ -184,15 +188,15 @@ public class PulsarConfiguration { } /** - * Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100 + * Group the consumer acknowledgments for the specified time in milliseconds + * - defaults to 100 */ public void setAckGroupTimeMillis(long ackGroupTimeMillis) { this.ackGroupTimeMillis = ackGroupTimeMillis; } /** - * Send timeout in milliseconds. - * Defaults to 30,000ms (30 seconds) + * Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) */ public void setSendTimeoutMs(int sendTimeoutMs) { this.sendTimeoutMs = sendTimeoutMs; @@ -203,10 +207,10 @@ public class PulsarConfiguration { } /** - * Set whether the send and asyncSend operations should block when the outgoing message queue is full. - * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left - * in the pending queue. - * Default is false. + * Set whether the send and asyncSend operations should block when the + * outgoing message queue is full. If set to false, send operations will + * immediately fail with ProducerQueueIsFullError when there is no space + * left in the pending queue. Default is false. */ public void setBlockIfQueueFull(boolean blockIfQueueFull) { this.blockIfQueueFull = blockIfQueueFull; @@ -217,8 +221,8 @@ public class PulsarConfiguration { } /** - * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. - * Default is 1000. + * Set the max size of the queue holding the messages pending to receive an + * acknowledgment from the broker. Default is 1000. */ public void setMaxPendingMessages(int maxPendingMessages) { this.maxPendingMessages = maxPendingMessages; @@ -229,8 +233,8 @@ public class PulsarConfiguration { } /** - * Set the number of max pending messages across all the partitions. - * Default is 50000. + * Set the number of max pending messages across all the partitions. Default + * is 50000. */ public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) { this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions; @@ -241,11 +245,12 @@ public class PulsarConfiguration { } /** - * Set the time period within which the messages sent will be batched if batch messages are - * enabled. If set to a non zero value, messages will be queued until either: + * Set the time period within which the messages sent will be batched if + * batch messages are enabled. If set to a non zero value, messages will be + * queued until either: * <ul> - * <li>this time interval expires</li> - * <li>the max number of messages in a batch is reached + * <li>this time interval expires</li> + * <li>the max number of messages in a batch is reached * </ul> * Default is 1ms. */ @@ -258,8 +263,7 @@ public class PulsarConfiguration { } /** - * Set the maximum number of messages permitted in a batch. - * Default 1,000. + * Set the maximum number of messages permitted in a batch. Default 1,000. */ public void setBatchingMaxMessages(int batchingMaxMessages) { this.batchingMaxMessages = batchingMaxMessages; @@ -270,8 +274,8 @@ public class PulsarConfiguration { } /** - * Control whether automatic batching of messages is enabled for the producer. - * Default is true. + * Control whether automatic batching of messages is enabled for the + * producer. Default is true. */ public void setBatchingEnabled(boolean batchingEnabled) { this.batchingEnabled = batchingEnabled; @@ -282,9 +286,10 @@ public class PulsarConfiguration { } /** - * Set the baseline for the sequence ids for messages published by the producer. - * First message will be using (initialSequenceId 1) as its sequence id and subsequent messages will be assigned - * incremental sequence ids, if not otherwise specified. + * Set the baseline for the sequence ids for messages published by the + * producer. First message will be using (initialSequenceId 1) as its + * sequence id and subsequent messages will be assigned incremental sequence + * ids, if not otherwise specified. */ public void setInitialSequenceId(long initialSequenceId) { this.initialSequenceId = initialSequenceId; @@ -315,24 +320,23 @@ public class PulsarConfiguration { /** * Set the message routing mode for the producer. */ - public MessageRoutingMode getMessageRoutingMode() { - return messageRoutingMode; - } + public MessageRoutingMode getMessageRoutingMode() { + return messageRoutingMode; + } - public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) { - this.messageRoutingMode = messageRoutingMode; - } + public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) { + this.messageRoutingMode = messageRoutingMode; + } /** * Set a custom Message Router. */ - public MessageRouter getMessageRouter() { - return messageRouter; - } - - public void setMessageRouter(MessageRouter messageRouter) { - this.messageRouter = messageRouter; - } - + public MessageRouter getMessageRouter() { + return messageRouter; + } + + public void setMessageRouter(MessageRouter messageRouter) { + this.messageRouter = messageRouter; + } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java index 1af739f..eb3b63e 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java @@ -58,4 +58,4 @@ public class PulsarPath { public boolean isAutoConfigurable() { return autoConfigurable; } -} \ No newline at end of file +} diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java index a96e369..de2ff9c 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java @@ -24,7 +24,7 @@ import org.apache.camel.component.pulsar.configuration.PulsarConfiguration; import org.apache.pulsar.client.api.ConsumerBuilder; public final class CommonCreationStrategyImpl { - + private CommonCreationStrategyImpl() { } @@ -32,8 +32,7 @@ public final class CommonCreationStrategyImpl { final PulsarConfiguration endpointConfiguration = pulsarEndpoint.getPulsarConfiguration(); return pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName()) - .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name) - .ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS) + .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name).ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS) .acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS) .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor())); } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java index 8332c2a..860337f 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java @@ -35,7 +35,7 @@ import static org.apache.camel.component.pulsar.utils.message.PulsarMessageHeade import static org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders.TOPIC_NAME; public final class PulsarMessageUtils { - + private PulsarMessageUtils() { } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java index a4bb747..c859d30 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java @@ -45,7 +45,7 @@ public class PulsarConcurrentConsumerInTest extends PulsarTestSupport { private static final int NUMBER_OF_CONSUMERS = 5; @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=5&subscriptionType=Shared" - + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerNamePrefix=camel-consumer-") + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerNamePrefix=camel-consumer-") private Endpoint from; @EndpointInject("mock:result") @@ -72,7 +72,7 @@ public class PulsarConcurrentConsumerInTest extends PulsarTestSupport { @Override protected Registry createCamelRegistry() throws Exception { SimpleRegistry registry = new SimpleRegistry(); - + registerPulsarBeans(registry); return registry; diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java index d6aac83..d3b4622 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java @@ -43,7 +43,7 @@ public class PulsarConcurrentProducerInTest extends PulsarTestSupport { private ProducerTemplate producerTemplate; @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=3&subscriptionType=Shared" + "&subscriptionName=camel-subscription&consumerQueueSize=5" - + "&consumerNamePrefix=camel-consumer" + "&producerName=" + PRODUCER) + + "&consumerNamePrefix=camel-consumer" + "&producerName=" + PRODUCER) private Endpoint from; @EndpointInject("mock:result") @@ -64,7 +64,7 @@ public class PulsarConcurrentProducerInTest extends PulsarTestSupport { @Override protected Registry createCamelRegistry() throws Exception { SimpleRegistry registry = new SimpleRegistry(); - + registerPulsarBeans(registry); return registry; diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java index 0ce2dcc..e3195b3 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java @@ -42,12 +42,8 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { private static final String TOPIC_URI = "persistent://public/default/camel-topic"; private static final String PRODUCER = "camel-producer-1"; - @EndpointInject(uri = "pulsar:" + TOPIC_URI - + "?numberOfConsumers=1&subscriptionType=Exclusive" - + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" - + "&allowManualAcknowledgement=true" - + "&ackTimeoutMillis=1000" - ) + @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000") private Endpoint from; @EndpointInject(uri = "mock:result") @@ -58,10 +54,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { @Before public void setup() throws Exception { context.removeRoute("myRoute"); - producer = givenPulsarClient().newProducer(Schema.STRING) - .producerName(PRODUCER) - .topic(TOPIC_URI) - .create(); + producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create(); } @Override @@ -85,11 +78,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { } private PulsarClient givenPulsarClient() throws PulsarClientException { - return new ClientBuilderImpl() - .serviceUrl(getPulsarBrokerUrl()) - .ioThreads(1) - .listenerThreads(1) - .build(); + return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build(); } @Test @@ -102,8 +91,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { from(from).routeId("myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); - PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() - .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); receipt.acknowledge(); }); } @@ -124,8 +112,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { from(from).routeId("myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); - PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() - .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); try { CompletableFuture<Void> f = receipt.acknowledgeAsync(); f.get(); @@ -151,8 +138,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { from(from).routeId("myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); - PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() - .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); // Ack the second message. The first will also be acked. if (exchange.getIn().getBody().equals("Hello World Again!")) { receipt.acknowledgeCumulative(); @@ -177,8 +163,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { from(from).routeId("myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); - PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() - .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); // Ack the second message. The first will also be acked. if (exchange.getIn().getBody().equals("Hello World Again!")) { try { diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java index 2590290..8ff9e91 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java @@ -43,10 +43,8 @@ public class PulsarConsumerInTest extends PulsarTestSupport { private static final String TOPIC_URI = "persistent://public/default/camel-topic"; private static final String PRODUCER = "camel-producer-1"; - @EndpointInject("pulsar:" + TOPIC_URI - + "?numberOfConsumers=1&subscriptionType=Exclusive" - + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" - ) + @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer") private Endpoint from; @EndpointInject("mock:result") @@ -73,7 +71,7 @@ public class PulsarConsumerInTest extends PulsarTestSupport { @Override protected Registry createCamelRegistry() throws Exception { SimpleRegistry registry = new SimpleRegistry(); - + registerPulsarBeans(registry); return registry; @@ -92,21 +90,14 @@ public class PulsarConsumerInTest extends PulsarTestSupport { } private PulsarClient givenPulsarClient() throws PulsarClientException { - return new ClientBuilderImpl() - .serviceUrl(getPulsarBrokerUrl()) - .ioThreads(1) - .listenerThreads(1) - .build(); + return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build(); } @Test public void testAMessageToClusterIsConsumed() throws Exception { to.expectedMessageCount(1); - Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING) - .producerName(PRODUCER) - .topic(TOPIC_URI) - .create(); + Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create(); producer.send("Hello World!"); diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java index 1cb5527..f57b4a5 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java @@ -36,11 +36,8 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport { private static final String TOPIC_URI = "persistent://public/default/camel-topic"; private static final String PRODUCER = "camel-producer-1"; - @EndpointInject(uri = "pulsar:" + TOPIC_URI - + "?numberOfConsumers=1&subscriptionType=Exclusive" - + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" - + "&ackTimeoutMillis=1000" - ) + @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&ackTimeoutMillis=1000") private Endpoint from; @EndpointInject(uri = "mock:result") @@ -74,26 +71,21 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport { PulsarComponent comp = new PulsarComponent(context); comp.setAutoConfiguration(autoConfiguration); comp.setPulsarClient(pulsarClient); - comp.setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter. + comp.setAllowManualAcknowledgement(true); // Set to true here instead of + // the endpoint query + // parameter. jndi.bind("pulsar", comp); } private PulsarClient givenPulsarClient() throws PulsarClientException { - return new ClientBuilderImpl() - .serviceUrl(getPulsarBrokerUrl()) - .ioThreads(1) - .listenerThreads(1) - .build(); + return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build(); } @Test public void testAMessageIsConsumedMultipleTimes() throws Exception { to.expectedMinimumMessageCount(2); - Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING) - .producerName(PRODUCER) - .topic(TOPIC_URI) - .create(); + Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create(); producer.send("Hello World!"); diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java index 24a9132..eea202e 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java @@ -53,12 +53,8 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport { public PulsarMessageReceipt mockPulsarMessageReceipt = mock(PulsarMessageReceipt.class); - @EndpointInject(uri = "pulsar:" + TOPIC_URI - + "?numberOfConsumers=1&subscriptionType=Exclusive" - + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" - + "&allowManualAcknowledgement=true" - + "&ackTimeoutMillis=1000" - ) + @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000") private Endpoint from; @EndpointInject(uri = "mock:result") @@ -68,10 +64,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport { @Before public void setup() throws Exception { - producer = givenPulsarClient().newProducer(Schema.STRING) - .producerName(PRODUCER) - .topic(TOPIC_URI) - .create(); + producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create(); } @Override @@ -97,11 +90,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport { } private PulsarClient givenPulsarClient() throws PulsarClientException { - return new ClientBuilderImpl() - .serviceUrl(getPulsarBrokerUrl()) - .ioThreads(1) - .listenerThreads(1) - .build(); + return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build(); } @Test @@ -116,8 +105,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport { from(from).routeId("myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); - PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() - .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); receipt.acknowledge(); }); } @@ -127,10 +115,11 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport { MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); - // The mock does not actually acknowledge using the pulsar consumer, so the message will be re-consumed + // The mock does not actually acknowledge using the pulsar consumer, so + // the message will be re-consumed // after the ackTimeout. verify(mockPulsarMessageReceipt, atLeast(2)).acknowledge(); verifyNoMoreInteractions(mockPulsarMessageReceipt); } - + } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java index c66ca1c..52b274e 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java @@ -40,12 +40,8 @@ public class PulsarProducerInTest extends PulsarTestSupport { @Produce("direct:start") private ProducerTemplate producerTemplate; - @EndpointInject("pulsar:" + TOPIC_URI - + "?numberOfConsumers=1&subscriptionType=Exclusive" - + "&subscriptionName=camel-subscription&consumerQueueSize=1" - + "&consumerName=camel-consumer" - + "&producerName=" + PRODUCER - ) + @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + "&subscriptionName=camel-subscription&consumerQueueSize=1" + + "&consumerName=camel-consumer" + "&producerName=" + PRODUCER) private Endpoint from; @EndpointInject("mock:result") @@ -66,7 +62,7 @@ public class PulsarProducerInTest extends PulsarTestSupport { @Override protected Registry createCamelRegistry() throws Exception { SimpleRegistry registry = new SimpleRegistry(); - + registerPulsarBeans(registry); return registry; @@ -85,11 +81,7 @@ public class PulsarProducerInTest extends PulsarTestSupport { } private PulsarClient givenPulsarClient() throws PulsarClientException { - return new ClientBuilderImpl() - .serviceUrl(getPulsarBrokerUrl()) - .ioThreads(1) - .listenerThreads(1) - .build(); + return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build(); } @Test diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java index c49f46a..03124db 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java @@ -34,9 +34,7 @@ public class PulsarTestSupport extends ContainerAwareTestSupport { } public static GenericContainer pulsarContainer() { - return new GenericContainer(CONTAINER_IMAGE) - .withNetworkAliases(CONTAINER_NAME) - .withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT) + return new GenericContainer(CONTAINER_IMAGE).withNetworkAliases(CONTAINER_NAME).withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT) .withCommand("/pulsar/bin/pulsar", "standalone", "--no-functions-worker", "-nss") .waitingFor(Wait.forHttp(WAIT_FOR_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)); } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java index b93ba10..b4c9dad 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java @@ -66,8 +66,8 @@ public class PulsarUtilsTest { doThrow(new PulsarClientException("A Pulsar Client exception occurred")).when(consumer).close(); consumer.close(); - + verify(consumer).unsubscribe(); verify(consumer).close(); } -} \ No newline at end of file +}