Repository: camel
Updated Branches:
  refs/heads/master 685ffd5d7 -> df062eaf4


Refactored API Consumer to support non-ScheduledPollConsumer using 
ApiConsumerHelper, moved getThreadProfileName() from Producer to Endpoint, to 
allow Consumers to access component executor service


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/df062eaf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/df062eaf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/df062eaf

Branch: refs/heads/master
Commit: df062eaf46894b85049f4aa043fdf4121c788771
Parents: 685ffd5
Author: Dhiraj Bokde <dhira...@yahoo.com>
Authored: Tue Jul 1 09:25:29 2014 -0700
Committer: Dhiraj Bokde <dhira...@yahoo.com>
Committed: Tue Jul 1 09:26:01 2014 -0700

----------------------------------------------------------------------
 .../util/component/AbstractApiConsumer.java     | 129 +++++--------------
 .../util/component/AbstractApiEndpoint.java     | 118 +++++++++++++----
 .../util/component/AbstractApiProducer.java     |  69 +++-------
 .../camel/util/component/ApiConsumerHelper.java | 128 ++++++++++++++++++
 .../component/ApiMethodPropertiesHelper.java    |   3 +-
 .../util/component/PropertiesInterceptor.java   |  32 +++++
 .../component/PropertyNamesInterceptor.java     |  32 +++++
 .../camel/util/component/ResultInterceptor.java |  39 ++++++
 .../src/main/java/__name__Endpoint.java         |   5 +
 .../src/main/java/__name__Producer.java         |   5 -
 10 files changed, 381 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java
 
b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java
index fca429f..b0f50e8 100644
--- 
a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiConsumer.java
@@ -16,12 +16,8 @@
  */
 package org.apache.camel.util.component;
 
-import java.lang.reflect.Array;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -35,7 +31,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Abstract base class for API Component Consumers.
  */
-public abstract class AbstractApiConsumer<E extends Enum<E> & ApiName, T> 
extends ScheduledPollConsumer {
+public abstract class AbstractApiConsumer<E extends Enum<E> & ApiName, T>
+    extends ScheduledPollConsumer implements PropertyNamesInterceptor, 
PropertiesInterceptor, ResultInterceptor {
 
     // logger
     protected final Logger log = LoggerFactory.getLogger(getClass());
@@ -43,31 +40,17 @@ public abstract class AbstractApiConsumer<E extends Enum<E> 
& ApiName, T> extend
     // API Endpoint
     protected final AbstractApiEndpoint<E, T> endpoint;
 
-    // helpers
-    protected final ApiMethodPropertiesHelper<T> propertiesHelper;
-    protected final ApiMethodHelper<? extends ApiMethod> methodHelper;
-
     // API method to invoke
     protected final ApiMethod method;
 
-    // properties used to invoke
-    protected final Map<String, Object> endpointProperties;
+    // split Array or Collection API method results into multiple Exchanges
+    private boolean splitResult = true;
 
     public AbstractApiConsumer(AbstractApiEndpoint<E, T> endpoint, Processor 
processor) {
         super(endpoint, processor);
 
         this.endpoint = endpoint;
-
-        // cache helpers
-        this.methodHelper = endpoint.getMethodHelper();
-        this.propertiesHelper = endpoint.getPropertiesHelper();
-
-        // get endpoint properties in a map
-        final HashMap<String, Object> properties = new HashMap<String, 
Object>();
-        propertiesHelper.getEndpointProperties(endpoint.getConfiguration(), 
properties);
-        this.endpointProperties = Collections.unmodifiableMap(properties);
-
-        this.method = findMethod();
+        this.method = ApiConsumerHelper.findMethod(endpoint, this);
     }
 
     @Override
@@ -76,83 +59,35 @@ public abstract class AbstractApiConsumer<E extends Enum<E> 
& ApiName, T> extend
         return false;
     }
 
-    private ApiMethod findMethod() {
-
-        ApiMethod result;
-        // find one that takes the largest subset of endpoint parameters
-        final Set<String> argNames = new HashSet<String>();
-        
argNames.addAll(propertiesHelper.getEndpointPropertyNames(endpoint.getConfiguration()));
-
-        interceptPropertyNames(argNames);
-
-        final String[] argNamesArray = argNames.toArray(new 
String[argNames.size()]);
-        List<ApiMethod> filteredMethods = ApiMethodHelper.filterMethods(
-                endpoint.getCandidates(), ApiMethodHelper.MatchType.SUPER_SET, 
argNamesArray);
-
-        if (filteredMethods.isEmpty()) {
-            throw new IllegalArgumentException(
-                    String.format("Missing properties for %s/%s, need one or 
more from %s",
-                            endpoint.getApiName().getName(), 
endpoint.getMethodName(),
-                            
methodHelper.getMissingProperties(endpoint.getMethodName(), argNames)));
-        } else if (filteredMethods.size() == 1) {
-            // single match
-            result = filteredMethods.get(0);
-        } else {
-            result = ApiMethodHelper.getHighestPriorityMethod(filteredMethods);
-            log.warn("Using highest priority operation {} from operations {}", 
method, filteredMethods);
-        }
-
-        return result;
-    }
-
     @Override
     protected int poll() throws Exception {
         // invoke the consumer method
         final Map<String, Object> args = new HashMap<String, Object>();
-        args.putAll(endpointProperties);
+        args.putAll(endpoint.getEndpointProperties());
 
         // let the endpoint and the Consumer intercept properties
         endpoint.interceptProperties(args);
         interceptProperties(args);
 
         try {
+
             Object result = doInvokeMethod(args);
+            return ApiConsumerHelper.getResultsProcessed(this, result, 
isSplitResult());
 
-            // process result according to type
-            if (result != null && (result instanceof Collection || 
result.getClass().isArray())) {
-                // create an exchange for every element
-                final Object array = getResultAsArray(result);
-                final int length = Array.getLength(array);
-                for (int i = 0; i < length; i++) {
-                    processResult(Array.get(array, i));
-                }
-                return length;
-            } else {
-                processResult(result);
-                return 1; // number of messages polled
-            }
         } catch (Throwable t) {
             throw ObjectHelper.wrapRuntimeCamelException(t);
         }
     }
 
-    /**
-     * Intercept property names used to find Consumer method.
-     * Used to add any custom/hidden method arguments, which MUST be provided 
in interceptProperties() override.
-     * @param propertyNames argument names.
-     */
+    @Override
     @SuppressWarnings("unused")
-    protected void interceptPropertyNames(Set<String> propertyNames) {
+    public void interceptPropertyNames(Set<String> propertyNames) {
         // do nothing by default
     }
 
-    /**
-     * Intercept method invocation arguments used to find and invoke API 
method.
-     * Can be overridden to add custom/hidden method arguments.
-     * @param properties method invocation arguments.
-     */
+    @Override
     @SuppressWarnings("unused")
-    protected void interceptProperties(Map<String, Object> properties) {
+    public void interceptProperties(Map<String, Object> properties) {
         // do nothing by default
     }
 
@@ -167,33 +102,24 @@ public abstract class AbstractApiConsumer<E extends 
Enum<E> & ApiName, T> extend
         return ApiMethodHelper.invokeMethod(endpoint.getApiProxy(method, 
args), method, args);
     }
 
-    private void processResult(Object result) throws Exception {
-        Exchange exchange = getEndpoint().createExchange();
-        exchange.getIn().setBody(result);
-
-        interceptResult(exchange);
-        try {
-            // send message to next processor in the route
-            getProcessor().process(exchange);
-        } finally {
-            // log exception if an exception occurred and was not handled
-            final Exception exception = exchange.getException();
-            if (exception != null) {
-                getExceptionHandler().handleException("Error processing 
exchange", exchange, exception);
-            }
+    @Override
+    public Object splitResult(Object result) {
+        // process result according to type
+        if (splitResult && result != null && (result instanceof Collection || 
result.getClass().isArray())) {
+            // create an exchange for every element
+            return getResultAsArray(result);
+        } else {
+            return result;
         }
     }
 
-    /**
-     * Derived classes can do additional result exchange processing, for 
example, adding custom headers.
-     * @param resultExchange result as a Camel exchange.
-     */
+    @Override
     @SuppressWarnings("unused")
-    protected void interceptResult(Exchange resultExchange) {
+    public void interceptResult(Object result, Exchange resultExchange) {
         // do nothing by default
     }
 
-    private Object getResultAsArray(Object result) {
+    private static Object getResultAsArray(Object result) {
         if (result.getClass().isArray()) {
             // no conversion needed
             return result;
@@ -202,4 +128,13 @@ public abstract class AbstractApiConsumer<E extends 
Enum<E> & ApiName, T> extend
         Collection<?> collection = (Collection<?>) result;
         return collection.toArray(new Object[collection.size()]);
     }
+
+    public final boolean isSplitResult() {
+        return splitResult;
+    }
+
+    @SuppressWarnings("unused")
+    public final void setSplitResult(boolean splitResult) {
+        this.splitResult = splitResult;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java
 
b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java
index 260796d..68a9df6 100644
--- 
a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiEndpoint.java
@@ -18,13 +18,19 @@ package org.apache.camel.util.component;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -34,7 +40,11 @@ import org.slf4j.LoggerFactory;
 /**
  * Abstract base class for API Component Endpoints.
  */
-public abstract class AbstractApiEndpoint<E extends ApiName, T> extends 
DefaultEndpoint {
+public abstract class AbstractApiEndpoint<E extends ApiName, T>
+    extends DefaultEndpoint implements PropertyNamesInterceptor, 
PropertiesInterceptor {
+
+    // thread pool executor with Endpoint Class name as keys
+    private static Map<String, ExecutorService> executorServiceMap = new 
ConcurrentHashMap<String, ExecutorService>();
 
     // logger
     protected final Logger log = LoggerFactory.getLogger(getClass());
@@ -59,6 +69,13 @@ public abstract class AbstractApiEndpoint<E extends ApiName, 
T> extends DefaultE
     // candidate methods based on method name and endpoint configuration
     private List<ApiMethod> candidates;
 
+    // cached Executor service
+    private ExecutorService executorService;
+
+    // cached property names and values
+    private Set<String> endpointPropertyNames;
+    private Map<String, Object> endpointProperties;
+
     public AbstractApiEndpoint(String endpointUri, Component component,
                                E apiName, String methodName, ApiMethodHelper<? 
extends ApiMethod> methodHelper, T endpointConfiguration) {
         super(endpointUri, component);
@@ -106,11 +123,18 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
     /**
      * Initialize endpoint state, including endpoint arguments, find candidate 
methods, etc.
      */
-    protected void initState() {
+    private void initState() {
+
+        // compute endpoint property names and values
+        this.endpointPropertyNames = Collections.unmodifiableSet(
+            getPropertiesHelper().getEndpointPropertyNames(configuration));
+        final HashMap<String, Object> properties = new HashMap<String, 
Object>();
+        getPropertiesHelper().getEndpointProperties(configuration, properties);
+        this.endpointProperties = Collections.unmodifiableMap(properties);
 
         // get endpoint property names
         final Set<String> arguments = new HashSet<String>();
-        
arguments.addAll(getPropertiesHelper().getEndpointPropertyNames(getConfiguration()));
+        arguments.addAll(endpointPropertyNames);
 
         // add inBody argument for producers
         if (inBody != null) {
@@ -124,6 +148,7 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
         // create a list of candidate methods
         candidates = new ArrayList<ApiMethod>();
         candidates.addAll(methodHelper.getCandidateMethods(methodName, 
argNames));
+        candidates = Collections.unmodifiableList(candidates);
 
         // error if there are no candidates
         if (candidates.isEmpty()) {
@@ -141,24 +166,15 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
         }
     }
 
-    /**
-     * Intercept property names used to find Consumer and Producer methods.
-     * Used to add any custom/hidden method arguments, which MUST be provided 
in interceptProperties() override
-     * either in Endpoint, or Consumer and Producer.
-     * @param propertyNames argument names.
-     */
+    @Override
     @SuppressWarnings("unused")
-    protected void interceptPropertyNames(Set<String> propertyNames) {
+    public void interceptPropertyNames(Set<String> propertyNames) {
         // do nothing by default
     }
 
-    /**
-     * Intercept method invocation arguments used to find and invoke API 
method. Called by Consumer and Producer.
-     * Must be overridden if also overriding interceptPropertyName() to add 
custom/hidden method properties.
-     * @param properties method invocation arguments.
-     */
+    @Override
     @SuppressWarnings("unused")
-    protected void interceptProperties(Map<String, Object> properties) {
+    public void interceptProperties(Map<String, Object> properties) {
         // do nothing by default
     }
 
@@ -168,7 +184,7 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
      *
      * @return endpoint configuration object
      */
-    public T getConfiguration() {
+    public final T getConfiguration() {
         return configuration;
     }
 
@@ -176,7 +192,7 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
      * Returns API name.
      * @return apiName property.
      */
-    public E getApiName() {
+    public final E getApiName() {
         return apiName;
     }
 
@@ -184,7 +200,7 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
      * Returns method name.
      * @return methodName property.
      */
-    public String getMethodName() {
+    public final String getMethodName() {
         return methodName;
     }
 
@@ -192,7 +208,7 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
      * Returns method helper.
      * @return methodHelper property.
      */
-    public ApiMethodHelper<? extends ApiMethod> getMethodHelper() {
+    public final ApiMethodHelper<? extends ApiMethod> getMethodHelper() {
         return methodHelper;
     }
 
@@ -200,15 +216,15 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
      * Returns candidate methods for this endpoint.
      * @return list of candidate methods.
      */
-    public List<ApiMethod> getCandidates() {
-        return Collections.unmodifiableList(candidates);
+    public final List<ApiMethod> getCandidates() {
+        return candidates;
     }
 
     /**
      * Returns name of parameter passed in the exchange In Body.
      * @return inBody property.
      */
-    public String getInBody() {
+    public final String getInBody() {
         return inBody;
     }
 
@@ -217,7 +233,7 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
      * @param inBody parameter name
      * @throws IllegalArgumentException for invalid parameter name.
      */
-    public void setInBody(String inBody) throws IllegalArgumentException {
+    public final void setInBody(String inBody) throws IllegalArgumentException 
{
         // validate property name
         ObjectHelper.notNull(inBody, "inBody");
         if 
(!getPropertiesHelper().getValidEndpointProperties(getConfiguration()).contains(inBody))
 {
@@ -226,6 +242,14 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
         this.inBody = inBody;
     }
 
+    public final Set<String> getEndpointPropertyNames() {
+        return endpointPropertyNames;
+    }
+
+    public final Map<String, Object> getEndpointProperties() {
+        return endpointProperties;
+    }
+
     /**
      * Returns an instance of an API Proxy based on apiName, method and args.
      * Called by {@link AbstractApiConsumer} or {@link AbstractApiProducer}.
@@ -237,4 +261,50 @@ public abstract class AbstractApiEndpoint<E extends 
ApiName, T> extends DefaultE
      * @see AbstractApiConsumer
      */
     public abstract Object getApiProxy(ApiMethod method, Map<String, Object> 
args);
+
+    private static ExecutorService getExecutorService(
+        Class<? extends AbstractApiEndpoint> endpointClass, CamelContext 
context, String threadProfileName) {
+
+        // lookup executorService for extending class name
+        final String endpointClassName = endpointClass.getName();
+        ExecutorService executorService = 
executorServiceMap.get(endpointClassName);
+
+        // CamelContext will shutdown thread pool when it shutdown so we can
+        // lazy create it on demand
+        // but in case of hot-deploy or the likes we need to be able to
+        // re-create it (its a shared static instance)
+        if (executorService == null || executorService.isTerminated() || 
executorService.isShutdown()) {
+            final ExecutorServiceManager manager = 
context.getExecutorServiceManager();
+
+            // try to lookup a pool first based on profile
+            ThreadPoolProfile poolProfile = manager.getThreadPoolProfile(
+                threadProfileName);
+            if (poolProfile == null) {
+                poolProfile = manager.getDefaultThreadPoolProfile();
+            }
+
+            // create a new pool using the custom or default profile
+            executorService = manager.newScheduledThreadPool(endpointClass, 
threadProfileName, poolProfile);
+
+            executorServiceMap.put(endpointClassName, executorService);
+        }
+
+        return executorService;
+    }
+
+    public final ExecutorService getExecutorService() {
+        if (this.executorService == null) {
+            // synchronize on class to avoid creating duplicate class level 
executors
+            synchronized (getClass()) {
+                this.executorService = getExecutorService(getClass(), 
getCamelContext(), getThreadProfileName());
+            }
+        }
+        return this.executorService;
+    }
+
+    /**
+     * Returns Thread profile name. Generated as a constant 
THREAD_PROFILE_NAME in *Constants.
+     * @return thread profile name to use.
+     */
+    protected abstract String getThreadProfileName();
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java
 
b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java
index 55e551f..17276aa 100644
--- 
a/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/util/component/AbstractApiProducer.java
@@ -23,12 +23,9 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.spi.ExecutorServiceManager;
-import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,10 +33,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Base class for API based Producers
  */
-public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T> 
extends DefaultAsyncProducer {
-
-    // thread pool executor
-    protected static ExecutorService executorService;
+public abstract class AbstractApiProducer<E extends Enum<E> & ApiName, T>
+    extends DefaultAsyncProducer implements PropertiesInterceptor, 
ResultInterceptor {
 
     // API Endpoint
     protected final AbstractApiEndpoint<E, T> endpoint;
@@ -53,6 +48,9 @@ public abstract class AbstractApiProducer<E extends Enum<E> & 
ApiName, T> extend
     // logger
     private final transient Logger log = LoggerFactory.getLogger(getClass());
 
+    // cached Endpoint executor service
+    private ExecutorService executorService;
+
     public AbstractApiProducer(AbstractApiEndpoint<E, T> endpoint, 
ApiMethodPropertiesHelper propertiesHelper) {
         super(endpoint);
         this.propertiesHelper = propertiesHelper;
@@ -64,7 +62,7 @@ public abstract class AbstractApiProducer<E extends Enum<E> & 
ApiName, T> extend
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         // properties for method arguments
         final Map<String, Object> properties = new HashMap<String, Object>();
-        propertiesHelper.getEndpointProperties(endpoint.getConfiguration(), 
properties);
+        properties.putAll(endpoint.getEndpointProperties());
         propertiesHelper.getExchangeProperties(exchange, properties);
 
         // let the endpoint and the Producer intercept properties
@@ -96,7 +94,7 @@ public abstract class AbstractApiProducer<E extends Enum<E> & 
ApiName, T> extend
                     // copy headers
                     
exchange.getOut().setHeaders(exchange.getIn().getHeaders());
 
-                    doProcessResult(exchange);
+                    interceptResult(result, exchange);
 
                 } catch (Throwable t) {
                     
exchange.setException(ObjectHelper.wrapRuntimeCamelException(t));
@@ -106,17 +104,13 @@ public abstract class AbstractApiProducer<E extends 
Enum<E> & ApiName, T> extend
             }
         };
 
-        getExecutorService(getEndpoint().getCamelContext()).submit(invocation);
+        endpoint.getExecutorService().submit(invocation);
         return false;
     }
 
-    /**
-     * Intercept method invocation arguments used to find and invoke API 
method.
-     * Can be overridden to add custom/hidden method arguments.
-     * @param properties method invocation arguments.
-     */
+    @Override
     @SuppressWarnings("unused")
-    protected void interceptProperties(Map<String, Object> properties) {
+    public void interceptProperties(Map<String, Object> properties) {
         // do nothing by default
     }
 
@@ -131,12 +125,15 @@ public abstract class AbstractApiProducer<E extends 
Enum<E> & ApiName, T> extend
         return ApiMethodHelper.invokeMethod(endpoint.getApiProxy(method, 
properties), method, properties);
     }
 
-    /**
-     * Do additional result processing, for example, add custom headers, etc.
-     * @param resultExchange API method result as exchange.
-     */
+    @Override
+    public final Object splitResult(Object result) {
+        // producer never splits results
+        return result;
+    }
+
+    @Override
     @SuppressWarnings("unused")
-    protected void doProcessResult(Exchange resultExchange) {
+    public void interceptResult(Object methodResult, Exchange resultExchange) {
         // do nothing by default
     }
 
@@ -192,34 +189,4 @@ public abstract class AbstractApiProducer<E extends 
Enum<E> & ApiName, T> extend
 
         return true;
     }
-
-    private synchronized ExecutorService getExecutorService(CamelContext 
context) {
-        // CamelContext will shutdown thread pool when it shutdown so we can
-        // lazy create it on demand
-        // but in case of hot-deploy or the likes we need to be able to
-        // re-create it (its a shared static instance)
-        if (executorService == null || executorService.isTerminated() || 
executorService.isShutdown()) {
-            final ExecutorServiceManager manager = 
context.getExecutorServiceManager();
-
-            // try to lookup a pool first based on profile
-            final String threadProfileName = getThreadProfileName();
-            ThreadPoolProfile poolProfile = manager.getThreadPoolProfile(
-                    threadProfileName);
-            if (poolProfile == null) {
-                poolProfile = manager.getDefaultThreadPoolProfile();
-            }
-
-            // create a new pool using the custom or default profile
-            executorService = manager.newScheduledThreadPool(getClass(),
-                    threadProfileName, poolProfile);
-        }
-
-        return executorService;
-    }
-
-    /**
-     * Returns Thread profile name. Generated as a constant 
THREAD_PROFILE_NAME in *Constants.
-     * @return thread profile name to use.
-     */
-    protected abstract String getThreadProfileName();
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/ApiConsumerHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/component/ApiConsumerHelper.java
 
b/camel-core/src/main/java/org/apache/camel/util/component/ApiConsumerHelper.java
new file mode 100644
index 0000000..779dea0
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/util/component/ApiConsumerHelper.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.component;
+
+import java.lang.reflect.Array;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for API Consumers.
+ */
+public final class ApiConsumerHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ApiConsumerHelper.class);
+
+    private ApiConsumerHelper() {
+    }
+
+    /**
+     * Utility method to find matching API Method for supplied endpoint's 
configuration properties.
+     * @param endpoint endpoint for configuration properties.
+     * @param propertyNamesInterceptor names interceptor for adapting property 
names, usually the consumer class itself.
+     * @param <E> ApiName enumeration.
+     * @param <T> Component configuration class.
+     * @return matching ApiMethod.
+     */
+    public static <E extends Enum<E> & ApiName, T> ApiMethod findMethod(
+        AbstractApiEndpoint<E, T> endpoint, PropertyNamesInterceptor 
propertyNamesInterceptor) {
+
+        ApiMethod result;
+        // find one that takes the largest subset of endpoint parameters
+        final Set<String> argNames = new HashSet<String>();
+        argNames.addAll(endpoint.getEndpointPropertyNames());
+
+        propertyNamesInterceptor.interceptPropertyNames(argNames);
+
+        final String[] argNamesArray = argNames.toArray(new 
String[argNames.size()]);
+        List<ApiMethod> filteredMethods = ApiMethodHelper.filterMethods(
+                endpoint.getCandidates(), ApiMethodHelper.MatchType.SUPER_SET, 
argNamesArray);
+
+        if (filteredMethods.isEmpty()) {
+            ApiMethodHelper methodHelper = endpoint.getMethodHelper();
+            throw new IllegalArgumentException(
+                    String.format("Missing properties for %s/%s, need one or 
more from %s",
+                            endpoint.getApiName().getName(), 
endpoint.getMethodName(),
+                            
methodHelper.getMissingProperties(endpoint.getMethodName(), argNames)));
+        } else if (filteredMethods.size() == 1) {
+            // single match
+            result = filteredMethods.get(0);
+        } else {
+            result = ApiMethodHelper.getHighestPriorityMethod(filteredMethods);
+            LOG.warn(String.format("Using highest priority operation %s from 
operations %s for endpoint %s",
+                result, filteredMethods, endpoint.getEndpointUri()));
+        }
+
+        return result;
+    }
+
+    /**
+     * Utility method for Consumers to process API method invocation result.
+     * @param consumer Consumer that wants to process results.
+     * @param result result of API method invocation.
+     * @param splitResult true if the Consumer wants to split result using 
{@link org.apache.camel.util.component.ResultInterceptor#splitResult(Object)} 
method.
+     * @param <T> Consumer class that extends DefaultConsumer and implements 
{@link org.apache.camel.util.component.ResultInterceptor}.
+     * @return number of result exchanges processed.
+     * @throws Exception on error.
+     */
+    public static <T extends DefaultConsumer & ResultInterceptor> int 
getResultsProcessed(
+        T consumer, Object result, boolean splitResult) throws Exception {
+
+        // process result according to type
+        if (result != null && splitResult) {
+            // try to split the result
+            final Object resultArray = consumer.splitResult(result);
+
+            if (resultArray != result && resultArray.getClass().isArray()) {
+                // create an exchange for every element
+                final int length = Array.getLength(resultArray);
+                for (int i = 0; i < length; i++) {
+                    processResult(consumer, result, Array.get(resultArray, i));
+                }
+                return length;
+            }
+        }
+
+        processResult(consumer, result, result);
+        return 1; // number of messages polled
+    }
+
+    private static <T extends DefaultConsumer & ResultInterceptor> void 
processResult(T consumer, Object methodResult, Object result)
+        throws Exception {
+
+        Exchange exchange = consumer.getEndpoint().createExchange();
+        exchange.getIn().setBody(result);
+
+        consumer.interceptResult(methodResult, exchange);
+        try {
+            // send message to next processor in the route
+            consumer.getProcessor().process(exchange);
+        } finally {
+            // log exception if an exception occurred and was not handled
+            final Exception exception = exchange.getException();
+            if (exception != null) {
+                consumer.getExceptionHandler().handleException("Error 
processing exchange", exchange, exception);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java
 
b/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java
index 685af97..e205a25 100644
--- 
a/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java
+++ 
b/camel-core/src/main/java/org/apache/camel/util/component/ApiMethodPropertiesHelper.java
@@ -71,8 +71,7 @@ public abstract class ApiMethodPropertiesHelper<C> {
         return properties;
     }
 
-    public void getEndpointProperties(Object endpointConfiguration,
-                                             Map<String, Object> properties) {
+    public void getEndpointProperties(Object endpointConfiguration, 
Map<String, Object> properties) {
 
         Set<String> names = null;
         if (IntrospectionSupport.getProperties(endpointConfiguration, 
properties, null, false)) {

http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/PropertiesInterceptor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/component/PropertiesInterceptor.java
 
b/camel-core/src/main/java/org/apache/camel/util/component/PropertiesInterceptor.java
new file mode 100644
index 0000000..c03e672
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/util/component/PropertiesInterceptor.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.component;
+
+import java.util.Map;
+
+/**
+ * Intercepts Property values used to invoke API Method.
+ */
+public interface PropertiesInterceptor {
+
+    /**
+     * Intercept method invocation arguments used to find and invoke API 
method.
+     * Can be overridden to add custom/hidden method arguments.
+     * @param properties method invocation arguments.
+     */
+    void interceptProperties(Map<String, Object> properties);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/PropertyNamesInterceptor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/component/PropertyNamesInterceptor.java
 
b/camel-core/src/main/java/org/apache/camel/util/component/PropertyNamesInterceptor.java
new file mode 100644
index 0000000..ac2bcf3
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/util/component/PropertyNamesInterceptor.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.component;
+
+import java.util.Set;
+
+/**
+ * Intercepts Property names used to find methods.
+ */
+public interface PropertyNamesInterceptor {
+
+    /**
+     * Intercept property names used to find API Method.
+     * Used to add any custom/hidden method arguments, which MUST be provided 
in interceptProperties() override.
+     * @param propertyNames argument names.
+     */
+    void interceptPropertyNames(Set<String> propertyNames);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/camel-core/src/main/java/org/apache/camel/util/component/ResultInterceptor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/component/ResultInterceptor.java
 
b/camel-core/src/main/java/org/apache/camel/util/component/ResultInterceptor.java
new file mode 100644
index 0000000..716ee44
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/util/component/ResultInterceptor.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.component;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Intercepts API method invocation result Exchange.
+ */
+public interface ResultInterceptor {
+
+    /**
+     * Split a complex result into result elements.
+     * @param result API method invocation result
+     * @return either the same result if it cannot be split, or an array 
object with split results
+     */
+    Object splitResult(Object result);
+
+    /**
+     * Do additional result exchange processing, for example, adding custom 
headers.
+     * @param result result of API method invocation.
+     * @param resultExchange result as a Camel exchange, may be a split result 
from Arrays or Collections.
+     */
+    void interceptResult(Object result, Exchange resultExchange);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java
----------------------------------------------------------------------
diff --git 
a/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java
 
b/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java
index 2db480e..e7a0340 100644
--- 
a/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java
+++ 
b/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Endpoint.java
@@ -30,6 +30,7 @@ import ${package}.api.${name}FileHello;
 import ${package}.api.${name}JavadocHello;
 import ${package}.internal.${name}ApiCollection;
 import ${package}.internal.${name}ApiName;
+import ${package}.internal.${name}Constants;
 import ${package}.internal.${name}PropertiesHelper;
 
 /**
@@ -67,6 +68,10 @@ public class ${name}Endpoint extends 
AbstractApiEndpoint<${name}ApiName, ${name}
         return ${name}PropertiesHelper.getHelper();
     }
 
+    protected String getThreadProfileName() {
+        return ${name}Constants.THREAD_PROFILE_NAME;
+    }
+
     @Override
     protected void afterConfigureProperties() {
         // TODO create API proxy, set connection properties, etc.

http://git-wip-us.apache.org/repos/asf/camel/blob/df062eaf/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java
----------------------------------------------------------------------
diff --git 
a/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java
 
b/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java
index 3a9adba..c75c87e 100644
--- 
a/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java
+++ 
b/tooling/archetypes/camel-archetype-api-component/src/main/resources/archetype-resources/__artifactId__-component/src/main/java/__name__Producer.java
@@ -19,7 +19,6 @@ package ${package};
 import org.apache.camel.util.component.AbstractApiProducer;
 
 import ${package}.internal.${name}ApiName;
-import ${package}.internal.${name}Constants;
 import ${package}.internal.${name}PropertiesHelper;
 
 /**
@@ -30,8 +29,4 @@ public class ${name}Producer extends 
AbstractApiProducer<${name}ApiName, ${name}
     public ${name}Producer(${name}Endpoint endpoint) {
         super(endpoint, ${name}PropertiesHelper.getHelper());
     }
-
-    protected String getThreadProfileName() {
-        return ${name}Constants.THREAD_PROFILE_NAME;
-    }
 }

Reply via email to