This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.8.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push: new c2607e3583e CAMEL-21211: Azure Service Bus - filter unknown header types (#15555) c2607e3583e is described below commit c2607e3583ee873d2a1de3a9b947692e9cecccf3 Author: Dylan Piergies <dylan.pierg...@gmail.com> AuthorDate: Mon Sep 16 05:45:34 2024 +0100 CAMEL-21211: Azure Service Bus - filter unknown header types (#15555) --- .../servicebus/ServiceBusHeaderFilterStrategy.java | 28 ++++++++-- .../ServiceBusHeaderFilterStrategyTest.java | 63 ++++++++++++++++++++++ .../integration/ServiceBusProducerIT.java | 54 ++++++++++--------- 3 files changed, 116 insertions(+), 29 deletions(-) diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java index 5ab2a97e08a..4ea6a0f0ba6 100644 --- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java +++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java @@ -16,16 +16,36 @@ */ package org.apache.camel.component.azure.servicebus; +import java.util.Date; +import java.util.Set; +import java.util.UUID; + +import org.apache.camel.Exchange; import org.apache.camel.support.DefaultHeaderFilterStrategy; public class ServiceBusHeaderFilterStrategy extends DefaultHeaderFilterStrategy { + private static final Set<Class<?>> SUPPORTED_TYPES = Set.of( + Boolean.class, + Byte.class, + Character.class, + Double.class, + Float.class, + Integer.class, + Long.class, + Short.class, + String.class, + Date.class, + UUID.class); + public ServiceBusHeaderFilterStrategy() { super(); - initialise(); + setOutFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH); + setInFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH); } - private void initialise() { - setOutFilterStartsWith("Camel", "camel", "org.apache.camel."); - setInFilterStartsWith("Camel", "camel", "org.apache.camel."); + @Override + public boolean applyFilterToCamelHeaders(String headerName, Object headerValue, Exchange exchange) { + return headerValue == null || !SUPPORTED_TYPES.contains(headerValue.getClass()) + || super.applyFilterToCamelHeaders(headerName, headerValue, exchange); } } diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategyTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategyTest.java new file mode 100644 index 00000000000..d54f22811af --- /dev/null +++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategyTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.servicebus; + +import java.util.Date; +import java.util.UUID; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import static org.assertj.core.api.Assertions.assertThat; + +class ServiceBusHeaderFilterStrategyTest { + private final ServiceBusHeaderFilterStrategy headerFilterStrategy = new ServiceBusHeaderFilterStrategy(); + + @ParameterizedTest + @ArgumentsSource(HeaderArgumentsProvider.class) + void testApplyFilterToCamelHeadersPassesKnownTypes(String headerName, Object headerValue) { + assertThat(headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue, null)).isFalse(); + } + + @Test + void testApplyFilterToCamelHeadersFiltersUnknownType() { + assertThat(headerFilterStrategy.applyFilterToCamelHeaders("objectHeader", new Object(), null)).isTrue(); + } + + static class HeaderArgumentsProvider implements ArgumentsProvider { + @Override + public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception { + return Stream.of( + Arguments.of("booleanHeader", true), + Arguments.of("byteHeader", (byte) 1), + Arguments.of("characterHeader", '1'), + Arguments.of("doubleHeader", 1.0D), + Arguments.of("floatHeader", 1.0F), + Arguments.of("integerHeader", 1), + Arguments.of("longHeader", 1L), + Arguments.of("shortHeader", (short) 1), + Arguments.of("stringHeader", "stringHeader"), + Arguments.of("timestampHeader", new Date()), + Arguments.of("uuidHeader", UUID.randomUUID())); + } + } +} diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java index bca480746d2..66f01442cfa 100644 --- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java +++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java @@ -17,10 +17,7 @@ package org.apache.camel.component.azure.servicebus.integration; import java.time.OffsetDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -45,9 +42,23 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { private static final String DIRECT_SEND_TO_QUEUE_URI = "direct:sendToQueue"; private static final String DIRECT_SEND_TO_TOPIC_URI = "direct:sendToTopic"; private static final String DIRECT_SEND_SCHEDULED_URI = "direct:sendScheduled"; - private static final String PROPAGATED_HEADER_KEY = "PropagatedCustomHeader"; - private static final String PROPAGATED_HEADER_VALUE = "propagated header value"; + private static final Map<String, Object> PROPAGATED_HEADERS = new HashMap<>(); private static final Pattern MESSAGE_BODY_PATTERN = Pattern.compile("^message-[0-4]$"); + + static { + PROPAGATED_HEADERS.put("booleanHeader", true); + PROPAGATED_HEADERS.put("byteHeader", (byte) 1); + PROPAGATED_HEADERS.put("characterHeader", '1'); + PROPAGATED_HEADERS.put("doubleHeader", 1.0D); + PROPAGATED_HEADERS.put("floatHeader", 1.0F); + PROPAGATED_HEADERS.put("integerHeader", 1); + PROPAGATED_HEADERS.put("longHeader", 1L); + PROPAGATED_HEADERS.put("shortHeader", (short) 1); + PROPAGATED_HEADERS.put("stringHeader", "stringHeader"); + PROPAGATED_HEADERS.put("timestampHeader", new Date()); + PROPAGATED_HEADERS.put("uuidHeader", UUID.randomUUID()); + } + private ProducerTemplate producerTemplate; @BeforeEach @@ -85,8 +96,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { client.start(); for (int i = 0; i < 5; i++) { String message = "message-" + i; - producerTemplate.sendBodyAndHeader(DIRECT_SEND_TO_QUEUE_URI, message, PROPAGATED_HEADER_KEY, - PROPAGATED_HEADER_VALUE); + producerTemplate.sendBodyAndHeaders(DIRECT_SEND_TO_QUEUE_URI, message, PROPAGATED_HEADERS); } assertTrue(messageLatch.await(3000, TimeUnit.MILLISECONDS)); @@ -96,8 +106,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { String messageBody = message.getBody().toString(); assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches()); Map<String, Object> applicationProperties = message.getApplicationProperties(); - assertEquals(1, applicationProperties.size()); - assertEquals(PROPAGATED_HEADER_VALUE, applicationProperties.get(PROPAGATED_HEADER_KEY)); + assertEquals(PROPAGATED_HEADERS, applicationProperties); }); } } @@ -114,7 +123,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { } producerTemplate.send(DIRECT_SEND_TO_QUEUE_URI, exchange -> { - exchange.getIn().setHeader(PROPAGATED_HEADER_KEY, PROPAGATED_HEADER_VALUE); + exchange.getIn().setHeaders(PROPAGATED_HEADERS); exchange.getIn().setBody(messageBatch); }); @@ -125,8 +134,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { String messageBody = message.getBody().toString(); assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches()); Map<String, Object> applicationProperties = message.getApplicationProperties(); - assertEquals(1, applicationProperties.size()); - assertEquals(PROPAGATED_HEADER_VALUE, applicationProperties.get(PROPAGATED_HEADER_KEY)); + assertEquals(PROPAGATED_HEADERS, applicationProperties); }); } } @@ -138,8 +146,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { client.start(); for (int i = 0; i < 5; i++) { String message = "message-" + i; - producerTemplate.sendBodyAndHeader(DIRECT_SEND_TO_TOPIC_URI, message, PROPAGATED_HEADER_KEY, - PROPAGATED_HEADER_VALUE); + producerTemplate.sendBodyAndHeaders(DIRECT_SEND_TO_TOPIC_URI, message, PROPAGATED_HEADERS); } assertTrue(messageLatch.await(3000, TimeUnit.MILLISECONDS)); @@ -149,8 +156,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { String messageBody = message.getBody().toString(); assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches()); Map<String, Object> applicationProperties = message.getApplicationProperties(); - assertEquals(1, applicationProperties.size()); - assertEquals(PROPAGATED_HEADER_VALUE, applicationProperties.get(PROPAGATED_HEADER_KEY)); + assertEquals(PROPAGATED_HEADERS, applicationProperties); }); } } @@ -162,8 +168,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { client.start(); for (int i = 0; i < 5; i++) { String message = "message-" + i; - Map<String, Object> headers = new HashMap<>(); - headers.put(PROPAGATED_HEADER_KEY, PROPAGATED_HEADER_VALUE); + Map<String, Object> headers = new HashMap<>(PROPAGATED_HEADERS); headers.put(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME, OffsetDateTime.now().plusSeconds(1)); producerTemplate.sendBodyAndHeaders(DIRECT_SEND_SCHEDULED_URI, message, headers); } @@ -175,8 +180,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { String messageBody = message.getBody().toString(); assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches()); Map<String, Object> applicationProperties = message.getApplicationProperties(); - assertEquals(1, applicationProperties.size()); - assertEquals(PROPAGATED_HEADER_VALUE, applicationProperties.get(PROPAGATED_HEADER_KEY)); + assertEquals(PROPAGATED_HEADERS, applicationProperties); assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime()); }); } @@ -194,8 +198,9 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { } producerTemplate.send(DIRECT_SEND_SCHEDULED_URI, exchange -> { - exchange.getIn().setHeader(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME, OffsetDateTime.now().plusSeconds(1)); - exchange.getIn().setHeader(PROPAGATED_HEADER_KEY, PROPAGATED_HEADER_VALUE); + Map<String, Object> headers = new HashMap<>(PROPAGATED_HEADERS); + headers.put(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME, OffsetDateTime.now().plusSeconds(1)); + exchange.getIn().setHeaders(headers); exchange.getIn().setBody(messageBatch); }); @@ -206,8 +211,7 @@ public class ServiceBusProducerIT extends BaseServiceBusTestSupport { String messageBody = message.getBody().toString(); assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches()); Map<String, Object> applicationProperties = message.getApplicationProperties(); - assertEquals(1, applicationProperties.size()); - assertEquals(PROPAGATED_HEADER_VALUE, applicationProperties.get(PROPAGATED_HEADER_KEY)); + assertEquals(PROPAGATED_HEADERS, applicationProperties); assertInstanceOf(OffsetDateTime.class, message.getScheduledEnqueueTime()); }); }