This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new a2f92c0b139 CAMEL-18549: Changed filter list in the DynamicRouterProcessor to a m… (#8431) a2f92c0b139 is described below commit a2f92c0b139dbab0f15fb1e01288d74d4b7ce0a1 Author: Steve Storck <steve...@gmail.com> AuthorDate: Mon Sep 26 04:37:35 2022 -0400 CAMEL-18549: Changed filter list in the DynamicRouterProcessor to a m… (#8431) * CAMEL-18549: Changed filter list in the DynamicRouterProcessor to a map, where filters are mapped by filter ID. * CAMEL-18549: Fixed checkstyle error for import out of order. * CAMEL-18549: Attempting to fix build errors. --- .../apache/camel/catalog/schemas/camel-spring.xsd | 2 +- .../dynamicrouter/DynamicRouterProcessor.java | 47 +++++++++------------- .../dynamicrouter/DynamicRouterProcessorTest.java | 16 +++++++- 3 files changed, 33 insertions(+), 32 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd index 2efde78948e..dfe0517239e 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd @@ -489,7 +489,7 @@ Enriches a message with data from a secondary resource <xs:annotation> <xs:documentation xml:lang="en"> <![CDATA[ -Camel error handling. +Error handler settings ]]> </xs:documentation> </xs:annotation> diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java index 5509a2f30d3..c82e201d4ad 100644 --- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java +++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessor.java @@ -16,10 +16,7 @@ */ package org.apache.camel.component.dynamicrouter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -65,9 +62,10 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra private static final String LOG_ENDPOINT = "log:%s.%s?level=%s&showAll=true&multiline=true"; /** - * {@link FilterProcessor}s to determine if the incoming exchange should be routed, based on the content. + * {@link FilterProcessor}s, mapped by subscription ID, to determine if the incoming exchange should be routed based + * on the content. */ - private final ArrayList<PrioritizedFilterProcessor> filters; + private final TreeMap<String, PrioritizedFilterProcessor> filterMap; /** * The camel context. @@ -130,7 +128,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra final boolean warnDroppedMessage, final Supplier<PrioritizedFilterProcessorFactory> filterProcessorFactorySupplier) { this.id = id; - this.filters = new ArrayList<>(); + this.filterMap = new TreeMap<>(); this.camelContext = camelContext; this.recipientMode = recipientMode; this.producerTemplate = camelContext.createProducerTemplate(); @@ -196,10 +194,9 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra * @param filter the filter to add */ public void addFilter(final PrioritizedFilterProcessor filter) { - synchronized (filters) { + synchronized (filterMap) { if (filter != null) { - filters.add(filter); - filters.sort(PrioritizedFilterProcessor.COMPARATOR); + filterMap.put(filter.getId(), filter); LOG.debug("Added subscription: {}", filter); } } @@ -212,10 +209,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra * @return the filter with the supplied ID, or null */ public PrioritizedFilterProcessor getFilter(final String filterId) { - return filters.stream() - .filter(f -> filterId.equals(f.getId())) - .findFirst() - .orElse(null); + return filterMap.get(filterId); } /** @@ -224,22 +218,17 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra * @param filterId the ID of the filter to remove */ public void removeFilter(final String filterId) { - synchronized (filters) { - PrioritizedFilterProcessor toRemove = filters.stream() - .filter(f -> filterId.equals(f.getId())) - .findFirst() - .orElse(null); - Optional.ofNullable(toRemove) - .ifPresent(f -> { - if (filters.remove(f)) { - LOG.debug("Removed subscription: {}", f); - } - }); + synchronized (filterMap) { + Optional.ofNullable(filterMap.remove(filterId)) + .ifPresentOrElse( + f -> LOG.debug("Removed subscription: {}", f), + () -> LOG.debug("No subscription exists with ID: {}", filterId)); } } /** - * Match the exchange against all {@link #filters} to determine if any of them are suitable to handle the exchange. + * Match the exchange against all {@link #filterMap} to determine if any of them are suitable to handle the + * exchange. * * @param exchange the message exchange * @return list of filters that match for the exchange; if "firstMatch" mode, it is a singleton list of @@ -247,7 +236,7 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra */ List<PrioritizedFilterProcessor> matchFilters(final Exchange exchange) { return Optional.of( - filters.stream() + filterMap.values().stream() .filter(f -> f.matches(exchange)) .limit(MODE_FIRST_MATCH.equals(recipientMode) ? 1 : Integer.MAX_VALUE) .collect(Collectors.toList())) @@ -257,8 +246,8 @@ public class DynamicRouterProcessor extends AsyncProcessorSupport implements Tra /** * Processes the message exchange, where the caller supports having the exchange asynchronously processed. The - * exchange is matched against all {@link #filters} to determine if any of them are suitable to handle the exchange. - * When the first suitable filter is found, it processes the exchange. + * exchange is matched against all {@link #filterMap} to determine if any of them are suitable to handle the + * exchange. When the first suitable filter is found, it processes the exchange. * <p/> * If there was any failure in processing, then the caused {@link Exception} would be set on the {@link Exchange}. * diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java index d7e0ba15bd8..8ecf81f0dcd 100644 --- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java +++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/DynamicRouterProcessorTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.dynamicrouter; +import java.util.List; + import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.component.dynamicrouter.support.DynamicRouterTestSupport; @@ -23,7 +25,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_FIRST_MATCH; +import static org.apache.camel.component.dynamicrouter.DynamicRouterConstants.MODE_ALL_MATCH; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; @@ -34,7 +36,7 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport { @BeforeEach void localSetup() throws Exception { super.setup(); - processor = new DynamicRouterProcessor(PROCESSOR_ID, context, MODE_FIRST_MATCH, false, () -> filterProcessorFactory); + processor = new DynamicRouterProcessor(PROCESSOR_ID, context, MODE_ALL_MATCH, false, () -> filterProcessorFactory); processor.doInit(); } @@ -59,6 +61,16 @@ class DynamicRouterProcessorTest extends DynamicRouterTestSupport { assertEquals(filterProcessor, result); } + @Test + void addMultipleFiltersWithSameId() { + processor.addFilter(filterProcessor); + processor.addFilter(filterProcessor); + processor.addFilter(filterProcessor); + processor.addFilter(filterProcessor); + List<PrioritizedFilterProcessor> matchingFilters = processor.matchFilters(exchange); + assertEquals(1, matchingFilters.size()); + } + @Test void removeFilter() { addFilterAsFilterProcessor();