This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new c329be1c53b CAMEL-21679: Fix azure-servicebus configuration validation so that individual client options can be used c329be1c53b is described below commit c329be1c53b6d93f4c5eee73026f8fe1a917fab9 Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Wed Jan 29 12:01:06 2025 +0000 CAMEL-21679: Fix azure-servicebus configuration validation so that individual client options can be used --- .../azure/servicebus/ServiceBusComponent.java | 11 ----- .../azure/servicebus/ServiceBusConsumer.java | 6 +++ .../azure/servicebus/ServiceBusProducer.java | 1 + .../azure/servicebus/ServiceBusUtils.java | 13 ++++++ .../azure/servicebus/ServiceBusEndpointTest.java | 29 +++++++++--- .../azure/servicebus/ServiceBusUtilsTest.java | 51 ++++++++++++++++++++++ 6 files changed, 94 insertions(+), 17 deletions(-) diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java index 8957b1abda7..0f1af11f787 100644 --- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java +++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java @@ -58,7 +58,6 @@ public class ServiceBusComponent extends DefaultComponent { final ServiceBusEndpoint endpoint = new ServiceBusEndpoint(uri, this, configuration); setProperties(endpoint, parameters); setCredentials(configuration); - validateConfigurations(configuration); return endpoint; } @@ -86,14 +85,4 @@ public class ServiceBusComponent extends DefaultComponent { public void setConfiguration(ServiceBusConfiguration configuration) { this.configuration = configuration; } - - private void validateConfigurations(final ServiceBusConfiguration configuration) { - if (configuration.getProcessorClient() == null || configuration.getSenderClient() == null) { - if (ObjectHelper.isEmpty(configuration.getConnectionString()) && - ObjectHelper.isEmpty(configuration.getFullyQualifiedNamespace())) { - throw new IllegalArgumentException( - "Azure ServiceBus ConnectionString or FQNS must be specified."); - } - } - } } diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java index 01a4c543474..3c32ad9f2dd 100644 --- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java +++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java @@ -47,6 +47,12 @@ public class ServiceBusConsumer extends DefaultConsumer { super(endpoint, processor); } + @Override + protected void doInit() throws Exception { + super.doInit(); + ServiceBusUtils.validateConfiguration(getConfiguration(), true); + } + @Override protected void doStart() throws Exception { super.doStart(); diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java index 9dc3b58f1d2..5169ef67324 100644 --- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java +++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java @@ -58,6 +58,7 @@ public class ServiceBusProducer extends DefaultProducer { @Override protected void doInit() throws Exception { super.doInit(); + ServiceBusUtils.validateConfiguration(getConfiguration(), false); configurationOptionsProxy = new ServiceBusConfigurationOptionsProxy(getConfiguration()); } diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java index 7d553b94b54..db4a1a1ef7a 100644 --- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java +++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java @@ -61,4 +61,17 @@ public final class ServiceBusUtils { .map(obj -> createServiceBusMessage(obj, applicationProperties, correlationId, sessionId)) .collect(Collectors.toList()); } + + public static void validateConfiguration(final ServiceBusConfiguration configuration, final boolean isConsumer) { + final boolean customClientAbsent + = isConsumer ? configuration.getProcessorClient() == null : configuration.getSenderClient() == null; + if (customClientAbsent && isConnectionStringOrFQNSAbsent(configuration)) { + throw new IllegalArgumentException("Azure ServiceBus ConnectionString or FQNS must be specified."); + } + } + + static boolean isConnectionStringOrFQNSAbsent(final ServiceBusConfiguration configuration) { + return ObjectHelper.isEmpty(configuration.getConnectionString()) + && ObjectHelper.isEmpty(configuration.getFullyQualifiedNamespace()); + } } diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java index a2b291f849f..fdc10d7b7b2 100644 --- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java +++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java @@ -19,11 +19,15 @@ package org.apache.camel.component.azure.servicebus; import java.util.HashMap; import java.util.Map; +import com.azure.core.credential.AccessToken; import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; import com.azure.identity.DefaultAzureCredentialBuilder; +import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; import static org.junit.jupiter.api.Assertions.*; @@ -31,15 +35,28 @@ class ServiceBusEndpointTest extends CamelTestSupport { @Test void testCreateWithInvalidData() { - assertThrows(ResolveEndpointFailedException.class, - () -> context.getEndpoint("azure-servicebus:test//?")); + Exception exception = assertThrows(FailedToCreateProducerException.class, () -> { + template.sendBody("azure-servicebus:test//?", null); + }); + assertInstanceOf(IllegalArgumentException.class, exception.getCause()); - assertThrows(ResolveEndpointFailedException.class, - () -> context.getEndpoint("azure-servicebus://?connectionString=test")); + exception = assertThrows(ResolveEndpointFailedException.class, () -> { + template.sendBody("azure-servicebus://?connectionString=test", null); + }); + assertInstanceOf(IllegalArgumentException.class, exception.getCause()); // provided credential but no fully qualified namespace - assertThrows(ResolveEndpointFailedException.class, - () -> context.getEndpoint("azure-servicebus:test?tokenCredential=credential")); + context.getRegistry().bind("credential", new TokenCredential() { + @Override + public Mono<AccessToken> getToken(TokenRequestContext tokenRequestContext) { + return Mono.empty(); + } + }); + + exception = assertThrows(FailedToCreateProducerException.class, () -> { + template.sendBody("azure-servicebus:test?tokenCredential=#credential", null); + }); + assertInstanceOf(IllegalArgumentException.class, exception.getCause()); } @Test diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java index 0a97080d37e..5c80d003363 100644 --- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java +++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java @@ -22,7 +22,10 @@ import java.util.LinkedList; import java.util.List; import java.util.stream.StreamSupport; +import com.azure.messaging.servicebus.ServiceBusClientBuilder; import com.azure.messaging.servicebus.ServiceBusMessage; +import com.azure.messaging.servicebus.ServiceBusProcessorClient; +import com.azure.messaging.servicebus.ServiceBusSenderClient; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; @@ -133,4 +136,52 @@ public class ServiceBusUtilsTest { assertTrue(StreamSupport.stream(busMessages2.spliterator(), false) .anyMatch(record -> record.getSessionId().equals("session-2"))); } + + @Test + void validateConfigurationMissingCredentials() { + assertThrows(IllegalArgumentException.class, + () -> ServiceBusUtils.validateConfiguration(new ServiceBusConfiguration(), false)); + } + + @Test + void validateConfigurationConnectionStringProvided() { + ServiceBusConfiguration configuration = new ServiceBusConfiguration(); + configuration.setConnectionString("test"); + assertDoesNotThrow(() -> ServiceBusUtils.validateConfiguration(configuration, false)); + } + + @Test + void validateConfigurationFQNSProvided() { + ServiceBusConfiguration configuration = new ServiceBusConfiguration(); + configuration.setFullyQualifiedNamespace("test"); + assertDoesNotThrow(() -> ServiceBusUtils.validateConfiguration(configuration, false)); + } + + @Test + void validateConfigurationCustomProcessorClient() { + ServiceBusConfiguration configuration = new ServiceBusConfiguration(); + ServiceBusProcessorClient client = new ServiceBusClientBuilder() + .connectionString("Endpoint=sb://camel.apache.org/;SharedAccessKeyName=test;SharedAccessKey=test") + .processor() + .queueName("test") + .processMessage(serviceBusReceivedMessageContext -> { + }) + .processError(serviceBusErrorContext -> { + }) + .buildProcessorClient(); + configuration.setProcessorClient(client); + assertDoesNotThrow(() -> ServiceBusUtils.validateConfiguration(configuration, true)); + } + + @Test + void validateConfigurationCustomSenderClient() { + ServiceBusConfiguration configuration = new ServiceBusConfiguration(); + ServiceBusSenderClient client = new ServiceBusClientBuilder() + .connectionString("Endpoint=sb://camel.apache.org/;SharedAccessKeyName=test;SharedAccessKey=test") + .sender() + .queueName("test") + .buildClient(); + configuration.setSenderClient(client); + assertDoesNotThrow(() -> ServiceBusUtils.validateConfiguration(configuration, false)); + } }