Repository: camel Updated Branches: refs/heads/master a9180c61f -> 3fe56aefa
Camel CDI: Better handling of CDI event endpoints Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3fe56aef Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3fe56aef Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3fe56aef Branch: refs/heads/master Commit: 3fe56aefac9d4454e80d0cca4bf001592d2c67c8 Parents: a9180c6 Author: Antonin Stefanutti <anto...@stefanutti.fr> Authored: Tue May 3 13:20:06 2016 +0200 Committer: Antonin Stefanutti <anto...@stefanutti.fr> Committed: Tue May 3 13:20:06 2016 +0200 ---------------------------------------------------------------------- .../org/apache/camel/cdi/CdiCamelExtension.java | 31 +++--- .../org/apache/camel/cdi/CdiCamelFactory.java | 47 +-------- .../org/apache/camel/cdi/CdiEventConsumer.java | 6 +- .../org/apache/camel/cdi/CdiEventEndpoint.java | 100 ++++++++++++++++--- .../org/apache/camel/cdi/CdiEventProducer.java | 2 +- .../java/org/apache/camel/cdi/CdiSpiHelper.java | 4 +- .../camel/cdi/ForwardingObserverMethod.java | 24 ++--- 7 files changed, 118 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java index 0c5c408..c26f09f 100755 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java @@ -82,6 +82,7 @@ import static org.apache.camel.cdi.AnyLiteral.ANY; import static org.apache.camel.cdi.ApplicationScopedLiteral.APPLICATION_SCOPED; import static org.apache.camel.cdi.BeanManagerHelper.getReference; import static org.apache.camel.cdi.BeanManagerHelper.getReferencesByType; +import static org.apache.camel.cdi.CdiEventEndpoint.eventEndpointUri; import static org.apache.camel.cdi.CdiSpiHelper.getQualifiers; import static org.apache.camel.cdi.CdiSpiHelper.getRawType; import static org.apache.camel.cdi.CdiSpiHelper.hasAnnotation; @@ -104,7 +105,7 @@ public class CdiCamelExtension implements Extension { private final Set<AnnotatedType<?>> eagerBeans = newSetFromMap(new ConcurrentHashMap<>()); - private final Map<InjectionPoint, ForwardingObserverMethod<?>> cdiEventEndpoints = new ConcurrentHashMap<>(); + private final Map<String, CdiEventEndpoint<?>> cdiEventEndpoints = new ConcurrentHashMap<>(); private final Set<Bean<?>> cdiBeans = newSetFromMap(new ConcurrentHashMap<>()); @@ -118,8 +119,8 @@ public class CdiCamelExtension implements Extension { private final Set<ImportResource> resources = newSetFromMap(new ConcurrentHashMap<>()); - ForwardingObserverMethod<?> getObserverMethod(InjectionPoint ip) { - return cdiEventEndpoints.get(ip); + CdiEventEndpoint<?> getEventEndpoint(String uri) { + return cdiEventEndpoints.get(uri); } Set<Annotation> getObserverEvents() { @@ -218,19 +219,17 @@ public class CdiCamelExtension implements Extension { cdiBeans.add(pb.getBean()); } - private void beans(@Observes ProcessBean<?> pb) { + private void beans(@Observes ProcessBean<?> pb, BeanManager manager) { cdiBeans.add(pb.getBean()); - - // TODO: refine the key to the type and qualifiers instead of the whole injection point as it leads to registering redundant observers + // Lookup for CDI event endpoint injection points pb.getBean().getInjectionPoints().stream() .filter(ip -> CdiEventEndpoint.class.equals(getRawType(ip.getType()))) .forEach(ip -> { - // TODO: refine the key to the type and qualifiers instead of the whole injection point as it leads to registering redundant observers - if (ip.getType() instanceof ParameterizedType) { - cdiEventEndpoints.put(ip, new ForwardingObserverMethod<>(((ParameterizedType) ip.getType()).getActualTypeArguments()[0], ip.getQualifiers())); - } else if (ip.getType() instanceof Class) { - cdiEventEndpoints.put(ip, new ForwardingObserverMethod<>(Object.class, ip.getQualifiers())); - } + Type type = ip.getType() instanceof ParameterizedType + ? ((ParameterizedType) ip.getType()).getActualTypeArguments()[0] + : Object.class; + String uri = eventEndpointUri(type, ip.getQualifiers()); + cdiEventEndpoints.put(uri, new CdiEventEndpoint<>(uri, type, ip.getQualifiers(), manager)); }); } @@ -298,8 +297,8 @@ public class CdiCamelExtension implements Extension { extraBeans.forEach(abd::addBean); // Update the CDI Camel factory beans - Set<Annotation> endpointQualifiers = cdiEventEndpoints.keySet().stream() - .map(InjectionPoint::getQualifiers) + Set<Annotation> endpointQualifiers = cdiEventEndpoints.values().stream() + .map(CdiEventEndpoint::getQualifiers) .flatMap(Set::stream) .collect(toSet()); Set<Annotation> templateQualifiers = contextQualifiers.stream() @@ -315,7 +314,9 @@ public class CdiCamelExtension implements Extension { .forEach(abd::addBean); // Add CDI event endpoint observer methods - cdiEventEndpoints.values().forEach(abd::addObserverMethod); + cdiEventEndpoints.values().stream() + .map(ForwardingObserverMethod::new) + .forEach(abd::addObserverMethod); } private boolean shouldDeployDefaultCamelContext(Set<Bean<?>> beans) { http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java index 8644400..b0e87aa 100755 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java @@ -17,17 +17,11 @@ package org.apache.camel.cdi; import java.lang.annotation.Annotation; -import java.lang.reflect.Field; -import java.lang.reflect.GenericArrayType; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.Collection; import java.util.HashSet; import java.util.Optional; -import java.util.Set; -import java.util.stream.Stream; - -import static java.util.stream.Collectors.joining; import javax.enterprise.event.Event; import javax.enterprise.inject.Any; @@ -38,7 +32,6 @@ import javax.enterprise.inject.Produces; import javax.enterprise.inject.Typed; import javax.enterprise.inject.UnsatisfiedResolutionException; import javax.enterprise.inject.spi.InjectionPoint; -import javax.enterprise.util.TypeLiteral; import org.apache.camel.CamelContext; import org.apache.camel.ConsumerTemplate; @@ -47,6 +40,7 @@ import org.apache.camel.ProducerTemplate; import org.apache.camel.TypeConverter; import org.apache.camel.component.mock.MockEndpoint; +import static org.apache.camel.cdi.CdiEventEndpoint.eventEndpointUri; import static org.apache.camel.cdi.CdiSpiHelper.isAnnotationType; import static org.apache.camel.cdi.DefaultLiteral.DEFAULT; @@ -168,20 +162,7 @@ final class CdiCamelFactory { } String uri = eventEndpointUri(type, ip.getQualifiers()); if (context.hasEndpoint(uri) == null) { - // FIXME: to be replaced once event firing with dynamic parameterized type is properly supported (see https://issues.jboss.org/browse/CDI-516) - TypeLiteral<T> literal = new TypeLiteral<T>() { - }; - for (Field field : TypeLiteral.class.getDeclaredFields()) { - if (field.getType().equals(Type.class)) { - field.setAccessible(true); - field.set(literal, type); - break; - } - } - context.addEndpoint(uri, - new CdiEventEndpoint<>( - event.select(literal, ip.getQualifiers().stream().toArray(Annotation[]::new)), - uri, context, (ForwardingObserverMethod<T>) extension.getObserverMethod(ip))); + context.addEndpoint(uri, extension.getEventEndpoint(uri)); } return context.getEndpoint(uri, CdiEventEndpoint.class); } @@ -204,30 +185,6 @@ final class CdiCamelFactory { return instance.select(qualifiers.stream().toArray(Annotation[]::new)).get(); } - private static String eventEndpointUri(Type type, Set<Annotation> qualifiers) { - return "cdi-event://" + authorityFromType(type) + qualifiers.stream() - .map(Annotation::annotationType) - .map(Class::getCanonicalName) - .collect(joining("%2C", qualifiers.size() > 0 ? "?qualifiers=" : "", "")); - } - - private static String authorityFromType(Type type) { - if (type instanceof Class) { - return Class.class.cast(type).getName(); - } - if (type instanceof ParameterizedType) { - ParameterizedType pt = (ParameterizedType) type; - return Stream.of(pt.getActualTypeArguments()) - .map(CdiCamelFactory::authorityFromType) - .collect(joining("%2C", authorityFromType(pt.getRawType()) + "%3C", "%3E")); - } - if (type instanceof GenericArrayType) { - GenericArrayType arrayType = (GenericArrayType) type; - return authorityFromType(arrayType.getGenericComponentType()) + "%5B%5D"; - } - throw new IllegalArgumentException("Cannot create URI authority for event type [" + type + "]"); - } - private static <T extends Annotation> Optional<T> getQualifierByType(InjectionPoint ip, Class<T> type) { return ip.getQualifiers().stream() .filter(isAnnotationType(type)) http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java index f479163..b7d7861 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java @@ -24,7 +24,7 @@ import org.apache.camel.management.event.AbstractExchangeEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/* package-private */ final class CdiEventConsumer<T> extends DefaultConsumer { +final class CdiEventConsumer<T> extends DefaultConsumer { private final Logger logger = LoggerFactory.getLogger(CdiEventConsumer.class); @@ -38,12 +38,12 @@ import org.slf4j.LoggerFactory; @Override protected void doStart() throws Exception { super.doStart(); - endpoint.registerConsumer(this); + endpoint.addConsumer(this); } @Override protected void doStop() throws Exception { - endpoint.unregisterConsumer(this); + endpoint.removeConsumer(this); super.doStop(); } http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java index adfda4f..ece2bcb 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java @@ -16,11 +16,26 @@ */ package org.apache.camel.cdi; +import java.lang.annotation.Annotation; +import java.lang.reflect.Field; +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.joining; + +import javax.enterprise.context.spi.CreationalContext; import javax.enterprise.event.Event; +import javax.enterprise.inject.Any; +import javax.enterprise.inject.spi.BeanManager; +import javax.enterprise.inject.spi.InjectionTarget; +import javax.enterprise.util.TypeLiteral; +import javax.inject.Inject; -import org.apache.camel.CamelContext; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Processor; @@ -83,21 +98,82 @@ public final class CdiEventEndpoint<T> extends DefaultEndpoint { private final List<CdiEventConsumer<T>> consumers = new ArrayList<>(); - private final Event<T> event; + private final Type type; + + private final Set<Annotation> qualifiers; + + private final BeanManager manager; + + CdiEventEndpoint(String endpointUri, Type type, Set<Annotation> qualifiers, BeanManager manager) { + super(endpointUri); + this.type = type; + this.qualifiers = qualifiers; + this.manager = manager; + } + + static String eventEndpointUri(Type type, Set<Annotation> qualifiers) { + return "cdi-event://" + authorityFromType(type) + qualifiers.stream() + .map(CdiSpiHelper::createAnnotationId) + .collect(joining("%2C", qualifiers.size() > 0 ? "?qualifiers=" : "", "")); + } + + private static String authorityFromType(Type type) { + if (type instanceof Class) { + return Class.class.cast(type).getName(); + } + if (type instanceof ParameterizedType) { + return Stream.of(((ParameterizedType) type).getActualTypeArguments()) + .map(CdiEventEndpoint::authorityFromType) + .collect(joining("%2C", authorityFromType(((ParameterizedType) type).getRawType()) + "%3C", "%3E")); + } + if (type instanceof GenericArrayType) { + return authorityFromType(((GenericArrayType) type).getGenericComponentType()) + "%5B%5D"; + } + + throw new IllegalArgumentException("Cannot create URI authority for event type [" + type + "]"); + } + + Set<Annotation> getQualifiers() { + return qualifiers; + } - CdiEventEndpoint(Event<T> event, String endpointUri, CamelContext context, ForwardingObserverMethod<T> observer) { - super(endpointUri, context); - this.event = event; - observer.setObserver(this); + Type getType() { + return type; } + @Override public Consumer createConsumer(Processor processor) { return new CdiEventConsumer<>(this, processor); } @Override - public Producer createProducer() { - return new CdiEventProducer<>(this, event); + public Producer createProducer() throws IllegalAccessException { + // FIXME: to be replaced once event firing with dynamic parameterized type + // is properly supported (see https://issues.jboss.org/browse/CDI-516) + TypeLiteral<T> literal = new TypeLiteral<T>() { + }; + for (Field field : TypeLiteral.class.getDeclaredFields()) { + if (field.getType().equals(Type.class)) { + field.setAccessible(true); + field.set(literal, type); + break; + } + } + + InjectionTarget<AnyEvent> target = manager.createInjectionTarget(manager.createAnnotatedType(AnyEvent.class)); + CreationalContext<AnyEvent> ctx = manager.createCreationalContext(null); + AnyEvent instance = target.produce(ctx); + target.inject(instance, ctx); + return new CdiEventProducer<>(this, instance.event + .select(literal, qualifiers.stream().toArray(Annotation[]::new))); + } + + @Vetoed + private static class AnyEvent { + + @Any + @Inject + private Event<Object> event; } @Override @@ -105,13 +181,13 @@ public final class CdiEventEndpoint<T> extends DefaultEndpoint { return true; } - void registerConsumer(CdiEventConsumer<T> consumer) { + void addConsumer(CdiEventConsumer<T> consumer) { synchronized (consumers) { consumers.add(consumer); } } - void unregisterConsumer(CdiEventConsumer<T> consumer) { + void removeConsumer(CdiEventConsumer<T> consumer) { synchronized (consumers) { consumers.remove(consumer); } @@ -119,9 +195,7 @@ public final class CdiEventEndpoint<T> extends DefaultEndpoint { void notify(T t) { synchronized (consumers) { - for (CdiEventConsumer<T> consumer : consumers) { - consumer.notify(t); - } + consumers.forEach(consumer -> consumer.notify(t)); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java index 11cb274..5ada4db 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java @@ -23,7 +23,7 @@ import org.apache.camel.impl.DefaultProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/* package-private */ final class CdiEventProducer<T> extends DefaultProducer { +final class CdiEventProducer<T> extends DefaultProducer { private final Logger logger = LoggerFactory.getLogger(CdiEventProducer.class); http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java index 89e8b87..5071719 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java @@ -47,6 +47,7 @@ import javax.enterprise.inject.spi.AnnotatedMethod; import javax.enterprise.inject.spi.AnnotatedType; import javax.enterprise.inject.spi.Bean; import javax.enterprise.inject.spi.BeanManager; +import javax.enterprise.util.Nonbinding; import static org.apache.camel.cdi.AnyLiteral.ANY; import static org.apache.camel.cdi.DefaultLiteral.DEFAULT; @@ -202,11 +203,12 @@ final class CdiSpiHelper { /** * Generates a unique signature for an {@link Annotation}. */ - private static String createAnnotationId(Annotation annotation) { + static String createAnnotationId(Annotation annotation) { Method[] methods = doPrivileged( (PrivilegedAction<Method[]>) () -> annotation.annotationType().getDeclaredMethods()); return Stream.of(methods) + .filter(method -> !method.isAnnotationPresent(Nonbinding.class)) .sorted(comparing(Method::getName)) .collect(() -> new StringJoiner(",", "@" + annotation.annotationType().getCanonicalName() + "(", ")"), (joiner, method) -> { http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java index b0f8a32..77394b4 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java @@ -19,7 +19,6 @@ package org.apache.camel.cdi; import java.lang.annotation.Annotation; import java.lang.reflect.Type; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import javax.enterprise.event.Reception; import javax.enterprise.event.TransactionPhase; import javax.enterprise.inject.spi.ObserverMethod; @@ -28,19 +27,10 @@ import org.apache.camel.CamelContext; final class ForwardingObserverMethod<T> implements ObserverMethod<T> { - private final AtomicReference<CdiEventEndpoint<T>> observer = new AtomicReference<>(); + private final CdiEventEndpoint<T> endpoint; - private final Type type; - - private final Set<Annotation> qualifiers; - - ForwardingObserverMethod(Type type, Set<Annotation> qualifiers) { - this.type = type; - this.qualifiers = qualifiers; - } - - void setObserver(CdiEventEndpoint<T> observer) { - this.observer.set(observer); + ForwardingObserverMethod(CdiEventEndpoint<T> endpoint) { + this.endpoint = endpoint; } @Override @@ -50,12 +40,12 @@ final class ForwardingObserverMethod<T> implements ObserverMethod<T> { @Override public Type getObservedType() { - return type; + return endpoint.getType(); } @Override public Set<Annotation> getObservedQualifiers() { - return qualifiers; + return endpoint.getQualifiers(); } @Override @@ -70,8 +60,6 @@ final class ForwardingObserverMethod<T> implements ObserverMethod<T> { @Override public void notify(T event) { - if (observer.get() != null) { - observer.get().notify(event); - } + endpoint.notify(event); } }