Repository: camel Updated Branches: refs/heads/master 685ffd5d7 -> df062eaf4
Refactored API Consumer to support non-ScheduledPollConsumer using ApiConsumerHelper, moved getThreadProfileName() from Producer to Endpoint, to allow Consumers to access component executor service Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/df062eaf Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/df062eaf Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/df062eaf Branch: refs/heads/master Commit: df062eaf46894b85049f4aa043fdf4121c788771 Parents: 685ffd5 Author: Dhiraj Bokde <dhira...@yahoo.com> Authored: Tue Jul 1 09:25:29 2014 -0700 Committer: Dhiraj Bokde <dhira...@yahoo.com> Committed: Tue Jul 1 09:26:01 2014 -0700 ---------------------------------------------------------------------- .../util/component/AbstractApiConsumer.java | 129 +++++-------------- .../util/component/AbstractApiEndpoint.java | 118 +++++++++++++---- .../util/component/AbstractApiProducer.java | 69 +++------- .../camel/util/component/ApiConsumerHelper.java | 128 ++++++++++++++++++ .../component/ApiMethodPropertiesHelper.java | 3 +- .../util/component/PropertiesInterceptor.java | 32 +++++ .../component/PropertyNamesInterceptor.java | 32 +++++ .../camel/util/component/ResultInterceptor.java | 39 ++++++ .../src/main/java/__name__Endpoint.java | 5 + .../src/main/java/__name__Producer.java | 5 - 10 files changed, 381 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java index fca429f..b0f50e8 100644 --- a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java @@ -16,12 +16,8 @@ */ package org.apache.camel.util.component; -import java.lang.reflect.Array; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -35,7 +31,8 @@ import org.slf4j.LoggerFactory; /** * Abstract base class for API Component Consumers. */ -public abstract class AbstractApiConsumer<E extends Enum<E> & ApiName, T> extends ScheduledPollConsumer { +public abstract class AbstractApiConsumer<E extends Enum<E> & ApiName, T> + extends ScheduledPollConsumer implements PropertyNamesInterceptor, PropertiesInterceptor, ResultInterceptor { // logger protected final Logger log = LoggerFactory.getLogger(getClass()); @@ -43,31 +40,17 @@ public abstract class AbstractApiConsumer<E extends Enum<E> & ApiName, T> extend // API Endpoint protected final AbstractApiEndpoint<E, T> endpoint; - // helpers - protected final ApiMethodPropertiesHelper<T> propertiesHelper; - protected final ApiMethodHelper<? extends ApiMethod> methodHelper; - // API method to invoke protected final ApiMethod method; - // properties used to invoke - protected final Map<String, Object> endpointProperties; + // split Array or Collection API method results into multiple Exchanges + private boolean splitResult = true; public AbstractApiConsumer(AbstractApiEndpoint<E, T> endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; - - // cache helpers - this.methodHelper = endpoint.getMethodHelper(); - this.propertiesHelper = endpoint.getPropertiesHelper(); - - // get endpoint properties in a map - final HashMap<String, Object> properties = new HashMap<String, Object>(); - propertiesHelper.getEndpointProperties(endpoint.getConfiguration(), properties); - this.endpointProperties = Collections.unmodifiableMap(properties); - - this.method = findMethod(); + this.method = ApiConsumerHelper.findMethod(endpoint, this); } @Override @@ -76,83 +59,35 @@ public abstract class AbstractApiConsumer<E extends Enum<E> & ApiName, T> extend return false; } - private ApiMethod findMethod() { - - ApiMethod result; - // find one that takes the largest subset of endpoint parameters - final Set<String> argNames = new HashSet<String>(); - argNames.addAll(propertiesHelper.getEndpointPropertyNames(endpoint.getConfiguration())); - - interceptPropertyNames(argNames); - - final String[] argNamesArray = argNames.toArray(new String[argNames.size()]); - List<ApiMethod> filteredMethods = ApiMethodHelper.filterMethods( - endpoint.getCandidates(), ApiMethodHelper.MatchType.SUPER_SET, argNamesArray); - - if (filteredMethods.isEmpty()) { - throw new IllegalArgumentException( - String.format("Missing properties for %s/%s, need one or more from %s", - endpoint.getApiName().getName(), endpoint.getMethodName(), - methodHelper.getMissingProperties(endpoint.getMethodName(), argNames))); - } else if (filteredMethods.size() == 1) { - // single match - result = filteredMethods.get(0); - } else { - result = ApiMethodHelper.getHighestPriorityMethod(filteredMethods); - log.warn("Using highest priority operation {} from operations {}", method, filteredMethods); - } - - return result; - } - @Override protected int poll() throws Exception { // invoke the consumer method final Map<String, Object> args = new HashMap<String, Object>(); - args.putAll(endpointProperties); + args.putAll(endpoint.getEndpointProperties()); // let the endpoint and the Consumer intercept properties endpoint.interceptProperties(args); interceptProperties(args); try { + Object result = doInvokeMethod(args); + return ApiConsumerHelper.getResultsProcessed(this, result, isSplitResult()); - // process result according to type - if (result != null && (result instanceof Collection || result.getClass().isArray())) { - // create an exchange for every element - final Object array = getResultAsArray(result); - final int length = Array.getLength(array); - for (int i = 0; i < length; i++) { - processResult(Array.get(array, i)); - } - return length; - } else { - processResult(result); - return 1; // number of messages polled - } } catch (Throwable t) { throw ObjectHelper.wrapRuntimeCamelException(t); } } - /** - * Intercept property names used to find Consumer method. - * Used to add any custom/hidden method arguments, which MUST be provided in interceptProperties() override. - * @param propertyNames argument names. - */ + @Override @SuppressWarnings("unused") - protected void interceptPropertyNames(Set<String> propertyNames) { + public void interceptPropertyNames(Set<String> propertyNames) { // do nothing by default } - /** - * Intercept method invocation arguments used to find and invoke API method. - * Can be overridden to add custom/hidden method arguments. - * @param properties method invocation arguments. - */ + @Override @SuppressWarnings("unused") - protected void interceptProperties(Map<String, Object> properties) { + public void interceptProperties(Map<String, Object> properties) { // do nothing by default } @@ -167,33 +102,24 @@ public abstract class AbstractApiConsumer<E extends Enum<E> & ApiName, T> extend return ApiMethodHelper.invokeMethod(endpoint.getApiProxy(method, args), method, args); } - private void processResult(Object result) throws Exception { - Exchange exchange = getEndpoint().createExchange(); - exchange.getIn().setBody(result); - - interceptResult(exchange); - try { - // send message to next processor in the route - getProcessor().process(exchange); - } finally { - // log exception if an exception occurred and was not handled - final Exception exception = exchange.getException(); - if (exception != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exception); - } + @Override + public Object splitResult(Object result) { + // process result according to type + if (splitResult && result != null && (result instanceof Collection || result.getClass().isArray())) { + // create an exchange for every element + return getResultAsArray(result); + } else { + return result; } } - /** - * Derived classes can do additional result exchange processing, for example, adding custom headers. - * @param resultExchange result as a Camel exchange. - */ + @Override @SuppressWarnings("unused") - protected void interceptResult(Exchange resultExchange) { + public void interceptResult(Object result, Exchange resultExchange) { // do nothing by default } - private Object getResultAsArray(Object result) { + private static Object getResultAsArray(Object result) { if (result.getClass().isArray()) { // no conversion needed return result; @@ -202,4 +128,13 @@ public abstract class AbstractApiConsumer<E extends Enum<E> & ApiName, T> extend Collection<?> collection = (Collection<?>) result; return collection.toArray(new Object[collection.size()]); } + + public final boolean isSplitResult() { + return splitResult; + } + + @SuppressWarnings("unused") + public final void setSplitResult(boolean splitResult) { + this.splitResult = splitResult; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java index 260796d..68a9df6 100644 --- a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java @@ -18,13 +18,19 @@ package org.apache.camel.util.component; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import org.apache.camel.CamelContext; import org.apache.camel.Component; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.spi.UriParam; import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.ObjectHelper; @@ -34,7 +40,11 @@ import org.slf4j.LoggerFactory; /** * Abstract base class for API Component Endpoints. */ -public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultEndpoint { +public abstract class AbstractApiEndpoint<E extends ApiName, T> + extends DefaultEndpoint implements PropertyNamesInterceptor, PropertiesInterceptor { + + // thread pool executor with Endpoint Class name as keys + private static Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<String, ExecutorService>(); // logger protected final Logger log = LoggerFactory.getLogger(getClass()); @@ -59,6 +69,13 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE // candidate methods based on method name and endpoint configuration private List<ApiMethod> candidates; + // cached Executor service + private ExecutorService executorService; + + // cached property names and values + private Set<String> endpointPropertyNames; + private Map<String, Object> endpointProperties; + public AbstractApiEndpoint(String endpointUri, Component component, E apiName, String methodName, ApiMethodHelper<? extends ApiMethod> methodHelper, T endpointConfiguration) { super(endpointUri, component); @@ -106,11 +123,18 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE /** * Initialize endpoint state, including endpoint arguments, find candidate methods, etc. */ - protected void initState() { + private void initState() { + + // compute endpoint property names and values + this.endpointPropertyNames = Collections.unmodifiableSet( + getPropertiesHelper().getEndpointPropertyNames(configuration)); + final HashMap<String, Object> properties = new HashMap<String, Object>(); + getPropertiesHelper().getEndpointProperties(configuration, properties); + this.endpointProperties = Collections.unmodifiableMap(properties); // get endpoint property names final Set<String> arguments = new HashSet<String>(); - arguments.addAll(getPropertiesHelper().getEndpointPropertyNames(getConfiguration())); + arguments.addAll(endpointPropertyNames); // add inBody argument for producers if (inBody != null) { @@ -124,6 +148,7 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE // create a list of candidate methods candidates = new ArrayList<ApiMethod>(); candidates.addAll(methodHelper.getCandidateMethods(methodName, argNames)); + candidates = Collections.unmodifiableList(candidates); // error if there are no candidates if (candidates.isEmpty()) { @@ -141,24 +166,15 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE } } - /** - * Intercept property names used to find Consumer and Producer methods. - * Used to add any custom/hidden method arguments, which MUST be provided in interceptProperties() override - * either in Endpoint, or Consumer and Producer. - * @param propertyNames argument names. - */ + @Override @SuppressWarnings("unused") - protected void interceptPropertyNames(Set<String> propertyNames) { + public void interceptPropertyNames(Set<String> propertyNames) { // do nothing by default } - /** - * Intercept method invocation arguments used to find and invoke API method. Called by Consumer and Producer. - * Must be overridden if also overriding interceptPropertyName() to add custom/hidden method properties. - * @param properties method invocation arguments. - */ + @Override @SuppressWarnings("unused") - protected void interceptProperties(Map<String, Object> properties) { + public void interceptProperties(Map<String, Object> properties) { // do nothing by default } @@ -168,7 +184,7 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE * * @return endpoint configuration object */ - public T getConfiguration() { + public final T getConfiguration() { return configuration; } @@ -176,7 +192,7 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE * Returns API name. * @return apiName property. */ - public E getApiName() { + public final E getApiName() { return apiName; } @@ -184,7 +200,7 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE * Returns method name. * @return methodName property. */ - public String getMethodName() { + public final String getMethodName() { return methodName; } @@ -192,7 +208,7 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE * Returns method helper. * @return methodHelper property. */ - public ApiMethodHelper<? extends ApiMethod> getMethodHelper() { + public final ApiMethodHelper<? extends ApiMethod> getMethodHelper() { return methodHelper; } @@ -200,15 +216,15 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE * Returns candidate methods for this endpoint. * @return list of candidate methods. */ - public List<ApiMethod> getCandidates() { - return Collections.unmodifiableList(candidates); + public final List<ApiMethod> getCandidates() { + return candidates; } /** * Returns name of parameter passed in the exchange In Body. * @return inBody property. */ - public String getInBody() { + public final String getInBody() { return inBody; } @@ -217,7 +233,7 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE * @param inBody parameter name * @throws IllegalArgumentException for invalid parameter name. */ - public void setInBody(String inBody) throws IllegalArgumentException { + public final void setInBody(String inBody) throws IllegalArgumentException { // validate property name ObjectHelper.notNull(inBody, "inBody"); if (!getPropertiesHelper().getValidEndpointProperties(getConfiguration()).contains(inBody)) { @@ -226,6 +242,14 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE this.inBody = inBody; } + public final Set<String> getEndpointPropertyNames() { + return endpointPropertyNames; + } + + public final Map<String, Object> getEndpointProperties() { + return endpointProperties; + } + /** * Returns an instance of an API Proxy based on apiName, method and args. * Called by {@link AbstractApiConsumer} or {@link AbstractApiProducer}. @@ -237,4 +261,50 @@ public abstract class AbstractApiEndpoint<E extends ApiName, T> extends DefaultE * @see AbstractApiConsumer */ public abstract Object getApiProxy(ApiMethod method, Map<String, Object> args); + + private static ExecutorService getExecutorService( + Class<? extends AbstractApiEndpoint> endpointClass, CamelContext context, String threadProfileName) { + + // lookup executorService for extending class name + final String endpointClassName = endpointClass.getName(); + ExecutorService executorService = executorServiceMap.get(endpointClassName); + + // CamelContext will shutdown thread pool when it shutdown so we can + // lazy create it on demand + // but in case of hot-deploy or the likes we need to be able to + // re-create it (its a shared static instance) + if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) { + final ExecutorServiceManager manager = context.getExecutorServiceManager(); + + // try to lookup a pool first based on profile + ThreadPoolProfile poolProfile = manager.getThreadPoolProfile( + threadProfileName); + if (poolProfile == null) { + poolProfile = manager.getDefaultThreadPoolProfile(); + } + + // create a new pool using the custom or default profile + executorService = manager.newScheduledThreadPool(endpointClass, threadProfileName, poolProfile); + + executorServiceMap.put(endpointClassName, executorService); + } + + return executorService; + } + + public final ExecutorService getExecutorService() { + if (this.executorService == null) { + // synchronize on class to avoid creating duplicate class level executors + synchronized (getClass()) { + this.executorService = getExecutorService(getClass(), getCamelContext(), getThreadProfileName()); + } + } + return this.executorService; + } + + /** + * Returns Thread profile name. Generated as a constant THREAD_PROFILE_NAME in *Constants. + * @return thread profile name to use. + */ + protected abstract String getThreadProfileName(); } http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java index 55e551f..17276aa 100644 --- a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java +++ b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java @@ -23,12 +23,9 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.camel.AsyncCallback; -import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.DefaultAsyncProducer; -import org.apache.camel.spi.ExecutorServiceManager; -import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,10 +33,8 @@ import org.slf4j.LoggerFactory; /** * Base class for API based Producers */ -public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T> extends DefaultAsyncProducer { - - // thread pool executor - protected static ExecutorService executorService; +public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T> + extends DefaultAsyncProducer implements PropertiesInterceptor, ResultInterceptor { // API Endpoint protected final AbstractApiEndpoint<E, T> endpoint; @@ -53,6 +48,9 @@ public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T> extend // logger private final transient Logger log = LoggerFactory.getLogger(getClass()); + // cached Endpoint executor service + private ExecutorService executorService; + public AbstractApiProducer(AbstractApiEndpoint<E, T> endpoint, ApiMethodPropertiesHelper propertiesHelper) { super(endpoint); this.propertiesHelper = propertiesHelper; @@ -64,7 +62,7 @@ public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T> extend public boolean process(final Exchange exchange, final AsyncCallback callback) { // properties for method arguments final Map<String, Object> properties = new HashMap<String, Object>(); - propertiesHelper.getEndpointProperties(endpoint.getConfiguration(), properties); + properties.putAll(endpoint.getEndpointProperties()); propertiesHelper.getExchangeProperties(exchange, properties); // let the endpoint and the Producer intercept properties @@ -96,7 +94,7 @@ public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T> extend // copy headers exchange.getOut().setHeaders(exchange.getIn().getHeaders()); - doProcessResult(exchange); + interceptResult(result, exchange); } catch (Throwable t) { exchange.setException(ObjectHelper.wrapRuntimeCamelException(t)); @@ -106,17 +104,13 @@ public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T> extend } }; - getExecutorService(getEndpoint().getCamelContext()).submit(invocation); + endpoint.getExecutorService().submit(invocation); return false; } - /** - * Intercept method invocation arguments used to find and invoke API method. - * Can be overridden to add custom/hidden method arguments. - * @param properties method invocation arguments. - */ + @Override @SuppressWarnings("unused") - protected void interceptProperties(Map<String, Object> properties) { + public void interceptProperties(Map<String, Object> properties) { // do nothing by default } @@ -131,12 +125,15 @@ public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T> extend return ApiMethodHelper.invokeMethod(endpoint.getApiProxy(method, properties), method, properties); } - /** - * Do additional result processing, for example, add custom headers, etc. - * @param resultExchange API method result as exchange. - */ + @Override + public final Object splitResult(Object result) { + // producer never splits results + return result; + } + + @Override @SuppressWarnings("unused") - protected void doProcessResult(Exchange resultExchange) { + public void interceptResult(Object methodResult, Exchange resultExchange) { // do nothing by default } @@ -192,34 +189,4 @@ public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T> extend return true; } - - private synchronized ExecutorService getExecutorService(CamelContext context) { - // CamelContext will shutdown thread pool when it shutdown so we can - // lazy create it on demand - // but in case of hot-deploy or the likes we need to be able to - // re-create it (its a shared static instance) - if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) { - final ExecutorServiceManager manager = context.getExecutorServiceManager(); - - // try to lookup a pool first based on profile - final String threadProfileName = getThreadProfileName(); - ThreadPoolProfile poolProfile = manager.getThreadPoolProfile( - threadProfileName); - if (poolProfile == null) { - poolProfile = manager.getDefaultThreadPoolProfile(); - } - - // create a new pool using the custom or default profile - executorService = manager.newScheduledThreadPool(getClass(), - threadProfileName, poolProfile); - } - - return executorService; - } - - /** - * Returns Thread profile name. Generated as a constant THREAD_PROFILE_NAME in *Constants. - * @return thread profile name to use. - */ - protected abstract String getThreadProfileName(); } http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/ApiConsumerHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/component/ApiConsumerHelper.java b/camel-core/src/main/java/org/apache/camel/util/component/ApiConsumerHelper.java new file mode 100644 index 0000000..779dea0 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/component/ApiConsumerHelper.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.component; + +import java.lang.reflect.Array; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for API Consumers. + */ +public final class ApiConsumerHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ApiConsumerHelper.class); + + private ApiConsumerHelper() { + } + + /** + * Utility method to find matching API Method for supplied endpoint's configuration properties. + * @param endpoint endpoint for configuration properties. + * @param propertyNamesInterceptor names interceptor for adapting property names, usually the consumer class itself. + * @param <E> ApiName enumeration. + * @param <T> Component configuration class. + * @return matching ApiMethod. + */ + public static <E extends Enum<E> & ApiName, T> ApiMethod findMethod( + AbstractApiEndpoint<E, T> endpoint, PropertyNamesInterceptor propertyNamesInterceptor) { + + ApiMethod result; + // find one that takes the largest subset of endpoint parameters + final Set<String> argNames = new HashSet<String>(); + argNames.addAll(endpoint.getEndpointPropertyNames()); + + propertyNamesInterceptor.interceptPropertyNames(argNames); + + final String[] argNamesArray = argNames.toArray(new String[argNames.size()]); + List<ApiMethod> filteredMethods = ApiMethodHelper.filterMethods( + endpoint.getCandidates(), ApiMethodHelper.MatchType.SUPER_SET, argNamesArray); + + if (filteredMethods.isEmpty()) { + ApiMethodHelper methodHelper = endpoint.getMethodHelper(); + throw new IllegalArgumentException( + String.format("Missing properties for %s/%s, need one or more from %s", + endpoint.getApiName().getName(), endpoint.getMethodName(), + methodHelper.getMissingProperties(endpoint.getMethodName(), argNames))); + } else if (filteredMethods.size() == 1) { + // single match + result = filteredMethods.get(0); + } else { + result = ApiMethodHelper.getHighestPriorityMethod(filteredMethods); + LOG.warn(String.format("Using highest priority operation %s from operations %s for endpoint %s", + result, filteredMethods, endpoint.getEndpointUri())); + } + + return result; + } + + /** + * Utility method for Consumers to process API method invocation result. + * @param consumer Consumer that wants to process results. + * @param result result of API method invocation. + * @param splitResult true if the Consumer wants to split result using {@link org.apache.camel.util.component.ResultInterceptor#splitResult(Object)} method. + * @param <T> Consumer class that extends DefaultConsumer and implements {@link org.apache.camel.util.component.ResultInterceptor}. + * @return number of result exchanges processed. + * @throws Exception on error. + */ + public static <T extends DefaultConsumer & ResultInterceptor> int getResultsProcessed( + T consumer, Object result, boolean splitResult) throws Exception { + + // process result according to type + if (result != null && splitResult) { + // try to split the result + final Object resultArray = consumer.splitResult(result); + + if (resultArray != result && resultArray.getClass().isArray()) { + // create an exchange for every element + final int length = Array.getLength(resultArray); + for (int i = 0; i < length; i++) { + processResult(consumer, result, Array.get(resultArray, i)); + } + return length; + } + } + + processResult(consumer, result, result); + return 1; // number of messages polled + } + + private static <T extends DefaultConsumer & ResultInterceptor> void processResult(T consumer, Object methodResult, Object result) + throws Exception { + + Exchange exchange = consumer.getEndpoint().createExchange(); + exchange.getIn().setBody(result); + + consumer.interceptResult(methodResult, exchange); + try { + // send message to next processor in the route + consumer.getProcessor().process(exchange); + } finally { + // log exception if an exception occurred and was not handled + final Exception exception = exchange.getException(); + if (exception != null) { + consumer.getExceptionHandler().handleException("Error processing exchange", exchange, exception); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java b/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java index 685af97..e205a25 100644 --- a/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java @@ -71,8 +71,7 @@ public abstract class ApiMethodPropertiesHelper<C> { return properties; } - public void getEndpointProperties(Object endpointConfiguration, - Map<String, Object> properties) { + public void getEndpointProperties(Object endpointConfiguration, Map<String, Object> properties) { Set<String> names = null; if (IntrospectionSupport.getProperties(endpointConfiguration, properties, null, false)) { http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/PropertiesInterceptor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/component/PropertiesInterceptor.java b/camel-core/src/main/java/org/apache/camel/util/component/PropertiesInterceptor.java new file mode 100644 index 0000000..c03e672 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/component/PropertiesInterceptor.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.component; + +import java.util.Map; + +/** + * Intercepts Property values used to invoke API Method. + */ +public interface PropertiesInterceptor { + + /** + * Intercept method invocation arguments used to find and invoke API method. + * Can be overridden to add custom/hidden method arguments. + * @param properties method invocation arguments. + */ + void interceptProperties(Map<String, Object> properties); +} http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/PropertyNamesInterceptor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/component/PropertyNamesInterceptor.java b/camel-core/src/main/java/org/apache/camel/util/component/PropertyNamesInterceptor.java new file mode 100644 index 0000000..ac2bcf3 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/component/PropertyNamesInterceptor.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.component; + +import java.util.Set; + +/** + * Intercepts Property names used to find methods. + */ +public interface PropertyNamesInterceptor { + + /** + * Intercept property names used to find API Method. + * Used to add any custom/hidden method arguments, which MUST be provided in interceptProperties() override. + * @param propertyNames argument names. + */ + void interceptPropertyNames(Set<String> propertyNames); +} http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/ResultInterceptor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/component/ResultInterceptor.java b/camel-core/src/main/java/org/apache/camel/util/component/ResultInterceptor.java new file mode 100644 index 0000000..716ee44 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/component/ResultInterceptor.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.component; + +import org.apache.camel.Exchange; + +/** + * Intercepts API method invocation result Exchange. + */ +public interface ResultInterceptor { + + /** + * Split a complex result into result elements. + * @param result API method invocation result + * @return either the same result if it cannot be split, or an array object with split results + */ + Object splitResult(Object result); + + /** + * Do additional result exchange processing, for example, adding custom headers. + * @param result result of API method invocation. + * @param resultExchange result as a Camel exchange, may be a split result from Arrays or Collections. + */ + void interceptResult(Object result, Exchange resultExchange); +} http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java ---------------------------------------------------------------------- diff --git a/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java b/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java index 2db480e..e7a0340 100644 --- a/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java +++ b/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java @@ -30,6 +30,7 @@ import ${package}.api.${name}FileHello; import ${package}.api.${name}JavadocHello; import ${package}.internal.${name}ApiCollection; import ${package}.internal.${name}ApiName; +import ${package}.internal.${name}Constants; import ${package}.internal.${name}PropertiesHelper; /** @@ -67,6 +68,10 @@ public class ${name}Endpoint extends AbstractApiEndpoint<${name}ApiName, ${name} return ${name}PropertiesHelper.getHelper(); } + protected String getThreadProfileName() { + return ${name}Constants.THREAD_PROFILE_NAME; + } + @Override protected void afterConfigureProperties() { // TODO create API proxy, set connection properties, etc. http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java ---------------------------------------------------------------------- diff --git a/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java b/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java index 3a9adba..c75c87e 100644 --- a/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java +++ b/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java @@ -19,7 +19,6 @@ package ${package}; import org.apache.camel.util.component.AbstractApiProducer; import ${package}.internal.${name}ApiName; -import ${package}.internal.${name}Constants; import ${package}.internal.${name}PropertiesHelper; /** @@ -30,8 +29,4 @@ public class ${name}Producer extends AbstractApiProducer<${name}ApiName, ${name} public ${name}Producer(${name}Endpoint endpoint) { super(endpoint, ${name}PropertiesHelper.getHelper()); } - - protected String getThreadProfileName() { - return ${name}Constants.THREAD_PROFILE_NAME; - } }