Repository: camel Updated Branches: refs/heads/camel-2.17.x 357ab2e96 -> 7aa181a4d
Fixed CS Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/63f64c7d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/63f64c7d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/63f64c7d Branch: refs/heads/camel-2.17.x Commit: 63f64c7d4f4d2a483cb903723faf7f68316325c4 Parents: 357ab2e Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Apr 28 19:26:39 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Apr 28 19:26:39 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/component/cxf/CxfConsumer.java | 422 ++++++++++--------- ...nsumerDuplicateNamespaceStreamCacheTest.java | 2 +- .../camel/http/common/HttpCommonEndpoint.java | 6 +- 3 files changed, 221 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/63f64c7d/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java index 4c29f8a..11ec2f2 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java @@ -64,210 +64,7 @@ public class CxfConsumer extends DefaultConsumer { cxfEndpoint = endpoint; // create server ServerFactoryBean svrBean = endpoint.createServerFactoryBean(); - svrBean.setInvoker(new Invoker() { - // we receive a CXF request when this method is called - public Object invoke(Exchange cxfExchange, Object o) { - LOG.trace("Received CXF Request: {}", cxfExchange); - Continuation continuation; - if (!endpoint.isSynchronous() && isAsyncInvocationSupported(cxfExchange) - && (continuation = getContinuation(cxfExchange)) != null) { - LOG.trace("Calling the Camel async processors."); - return asyncInvoke(cxfExchange, continuation); - } else { - LOG.trace("Calling the Camel sync processors."); - return syncInvoke(cxfExchange); - } - } - // NOTE this code cannot work with CXF 2.2.x and JMSContinuation - // as it doesn't break out the interceptor chain when we call it - private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) { - synchronized (continuation) { - if (continuation.isNew()) { - final org.apache.camel.Exchange camelExchange = prepareCamelExchange(cxfExchange); - - // Now we don't set up the timeout value - LOG.trace("Suspending continuation of exchangeId: {}", camelExchange.getExchangeId()); - - // The continuation could be called before the suspend is called - continuation.suspend(cxfEndpoint.getContinuationTimeout()); - - // use the asynchronous API to process the exchange - getAsyncProcessor().process(camelExchange, new AsyncCallback() { - public void done(boolean doneSync) { - // make sure the continuation resume will not be called before the suspend method in other thread - synchronized (continuation) { - LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId()); - // resume processing after both, sync and async callbacks - continuation.setObject(camelExchange); - continuation.resume(); - } - } - }); - - } else if (continuation.isResumed()) { - org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); - try { - setResponseBack(cxfExchange, camelExchange); - } finally { - CxfConsumer.this.doneUoW(camelExchange); - } - - } - } - return null; - } - private Continuation getContinuation(Exchange cxfExchange) { - ContinuationProvider provider = - (ContinuationProvider)cxfExchange.getInMessage().get(ContinuationProvider.class.getName()); - Continuation continuation = provider == null ? null : provider.getContinuation(); - // Make sure we don't return the JMSContinuation, as it doesn't support the Continuation we wants - // Don't want to introduce the dependency of cxf-rt-transprot-jms here - if (continuation != null && continuation.getClass().getName().equals("org.apache.cxf.transport.jms.continuations.JMSContinuation")) { - return null; - } else { - return continuation; - } - } - private Object syncInvoke(Exchange cxfExchange) { - org.apache.camel.Exchange camelExchange = prepareCamelExchange(cxfExchange); - try { - try { - LOG.trace("Processing +++ START +++"); - // send Camel exchange to the target processor - getProcessor().process(camelExchange); - } catch (Exception e) { - throw new Fault(e); - } - - LOG.trace("Processing +++ END +++"); - setResponseBack(cxfExchange, camelExchange); - } finally { - doneUoW(camelExchange); - } - // response should have been set in outMessage's content - return null; - } - - private org.apache.camel.Exchange prepareCamelExchange(Exchange cxfExchange) { - // get CXF binding - CxfEndpoint endpoint = (CxfEndpoint)getEndpoint(); - CxfBinding binding = endpoint.getCxfBinding(); - - // create a Camel exchange, the default MEP is InOut - org.apache.camel.Exchange camelExchange = endpoint.createExchange(); - - DataFormat dataFormat = endpoint.getDataFormat(); - - BindingOperationInfo boi = cxfExchange.getBindingOperationInfo(); - // make sure the "boi" is remained as wrapped in PAYLOAD mode - if (boi != null && dataFormat == DataFormat.PAYLOAD && boi.isUnwrapped()) { - boi = boi.getWrappedOperation(); - cxfExchange.put(BindingOperationInfo.class, boi); - } - - if (boi != null) { - camelExchange.setProperty(BindingOperationInfo.class.getName(), boi); - LOG.trace("Set exchange property: BindingOperationInfo: {}", boi); - // set the message exchange patter with the boi - if (boi.getOperationInfo().isOneWay()) { - camelExchange.setPattern(ExchangePattern.InOnly); - } - } else { - if (cxfEndpoint.getExchangePattern().equals(ExchangePattern.InOnly)) { - camelExchange.setPattern(ExchangePattern.InOnly); - } - } - - - // set data format mode in Camel exchange - camelExchange.setProperty(CxfConstants.DATA_FORMAT_PROPERTY, dataFormat); - LOG.trace("Set Exchange property: {}={}", DataFormat.class.getName(), dataFormat); - - camelExchange.setProperty(Message.MTOM_ENABLED, String.valueOf(endpoint.isMtomEnabled())); - - if (endpoint.getMergeProtocolHeaders()) { - camelExchange.setProperty(CxfConstants.CAMEL_CXF_PROTOCOL_HEADERS_MERGED, Boolean.TRUE); - } - // bind the CXF request into a Camel exchange - binding.populateExchangeFromCxfRequest(cxfExchange, camelExchange); - // extract the javax.xml.ws header - Map<String, Object> context = new HashMap<String, Object>(); - binding.extractJaxWsContext(cxfExchange, context); - // put the context into camelExchange - camelExchange.setProperty(CxfConstants.JAXWS_CONTEXT, context); - - // we want to handle the UoW - try { - CxfConsumer.this.createUoW(camelExchange); - } catch (Exception e) { - log.error("Error processing request", e); - throw new Fault(e); - } - return camelExchange; - } - - @SuppressWarnings("unchecked") - private void setResponseBack(Exchange cxfExchange, org.apache.camel.Exchange camelExchange) { - CxfEndpoint endpoint = (CxfEndpoint)getEndpoint(); - CxfBinding binding = endpoint.getCxfBinding(); - - checkFailure(camelExchange, cxfExchange); - - binding.populateCxfResponseFromExchange(camelExchange, cxfExchange); - - // check failure again as fault could be discovered by converter - checkFailure(camelExchange, cxfExchange); - - // copy the headers javax.xml.ws header back - binding.copyJaxWsContext(cxfExchange, (Map<String, Object>)camelExchange.getProperty(CxfConstants.JAXWS_CONTEXT)); - } - - private void checkFailure(org.apache.camel.Exchange camelExchange, Exchange cxfExchange) throws Fault { - final Throwable t; - if (camelExchange.isFailed()) { - org.apache.camel.Message camelMsg = camelExchange.hasOut() ? camelExchange.getOut() : camelExchange.getIn(); - if (camelMsg.isFault()) { - t = camelMsg.getBody(Throwable.class); - } else { - t = camelExchange.getException(); - } - cxfExchange.getInMessage().put(FaultMode.class, FaultMode.UNCHECKED_APPLICATION_FAULT); - if (t instanceof Fault) { - cxfExchange.getInMessage().put(FaultMode.class, FaultMode.CHECKED_APPLICATION_FAULT); - throw (Fault)t; - } else if (t != null) { - // This is not a CXF Fault. Build the CXF Fault manually. - Fault fault = new Fault(t); - if (fault.getMessage() == null) { - // The Fault has no Message. This is the case if it has - // no message, for example was a NullPointerException. - fault.setMessage(t.getClass().getSimpleName()); - } - WebFault faultAnnotation = t.getClass().getAnnotation(WebFault.class); - Object faultInfo = null; - try { - Method method = t.getClass().getMethod("getFaultInfo"); - faultInfo = method.invoke(t, new Object[0]); - } catch (Exception e) { - // do nothing here - } - if (faultAnnotation != null && faultInfo == null) { - // t has a JAX-WS WebFault annotation, which describes - // in detail the Web Service Fault that should be thrown. Add the - // detail. - Element detail = fault.getOrCreateDetail(); - Element faultDetails = detail.getOwnerDocument() - .createElementNS(faultAnnotation.targetNamespace(), faultAnnotation.name()); - detail.appendChild(faultDetails); - } - - throw fault; - } - - } - } - - }); + svrBean.setInvoker(new CxfConsumerInvoker(endpoint)); server = svrBean.create(); // Apply the server configurer if it is possible if (cxfEndpoint.getCxfEndpointConfigurer() != null) { @@ -289,6 +86,7 @@ public class CxfConsumer extends DefaultConsumer { server.stop(); super.doStop(); } + private EndpointReferenceType getReplyTo(Object o) { try { return (EndpointReferenceType)o.getClass().getMethod("getReplyTo").invoke(o); @@ -296,6 +94,7 @@ public class CxfConsumer extends DefaultConsumer { throw new Fault(t); } } + protected boolean isAsyncInvocationSupported(Exchange cxfExchange) { Message cxfMessage = cxfExchange.getInMessage(); Object addressingProperties = cxfMessage.get(CxfConstants.WSA_HEADERS_INBOUND); @@ -316,5 +115,218 @@ public class CxfConsumer extends DefaultConsumer { public Server getServer() { return server; } - + + private final class CxfConsumerInvoker implements Invoker { + private final CxfEndpoint endpoint; + + public CxfConsumerInvoker(CxfEndpoint endpoint) { + this.endpoint = endpoint; + } + + // we receive a CXF request when this method is called + public Object invoke(Exchange cxfExchange, Object o) { + LOG.trace("Received CXF Request: {}", cxfExchange); + Continuation continuation; + if (!endpoint.isSynchronous() && isAsyncInvocationSupported(cxfExchange) + && (continuation = getContinuation(cxfExchange)) != null) { + LOG.trace("Calling the Camel async processors."); + return asyncInvoke(cxfExchange, continuation); + } else { + LOG.trace("Calling the Camel sync processors."); + return syncInvoke(cxfExchange); + } + } + + // NOTE this code cannot work with CXF 2.2.x and JMSContinuation + // as it doesn't break out the interceptor chain when we call it + private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) { + synchronized (continuation) { + if (continuation.isNew()) { + final org.apache.camel.Exchange camelExchange = prepareCamelExchange(cxfExchange); + + // Now we don't set up the timeout value + LOG.trace("Suspending continuation of exchangeId: {}", camelExchange.getExchangeId()); + + // The continuation could be called before the suspend is called + continuation.suspend(cxfEndpoint.getContinuationTimeout()); + + // use the asynchronous API to process the exchange + getAsyncProcessor().process(camelExchange, new AsyncCallback() { + public void done(boolean doneSync) { + // make sure the continuation resume will not be called before the suspend method in other thread + synchronized (continuation) { + LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId()); + // resume processing after both, sync and async callbacks + continuation.setObject(camelExchange); + continuation.resume(); + } + } + }); + + } else if (continuation.isResumed()) { + org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); + try { + setResponseBack(cxfExchange, camelExchange); + } finally { + CxfConsumer.this.doneUoW(camelExchange); + } + + } + } + return null; + } + + private Continuation getContinuation(Exchange cxfExchange) { + ContinuationProvider provider = + (ContinuationProvider)cxfExchange.getInMessage().get(ContinuationProvider.class.getName()); + Continuation continuation = provider == null ? null : provider.getContinuation(); + // Make sure we don't return the JMSContinuation, as it doesn't support the Continuation we wants + // Don't want to introduce the dependency of cxf-rt-transprot-jms here + if (continuation != null && continuation.getClass().getName().equals("org.apache.cxf.transport.jms.continuations.JMSContinuation")) { + return null; + } else { + return continuation; + } + } + + private Object syncInvoke(Exchange cxfExchange) { + org.apache.camel.Exchange camelExchange = prepareCamelExchange(cxfExchange); + try { + try { + LOG.trace("Processing +++ START +++"); + // send Camel exchange to the target processor + getProcessor().process(camelExchange); + } catch (Exception e) { + throw new Fault(e); + } + + LOG.trace("Processing +++ END +++"); + setResponseBack(cxfExchange, camelExchange); + } finally { + doneUoW(camelExchange); + } + // response should have been set in outMessage's content + return null; + } + + private org.apache.camel.Exchange prepareCamelExchange(Exchange cxfExchange) { + // get CXF binding + CxfEndpoint endpoint = (CxfEndpoint)getEndpoint(); + CxfBinding binding = endpoint.getCxfBinding(); + + // create a Camel exchange, the default MEP is InOut + org.apache.camel.Exchange camelExchange = endpoint.createExchange(); + + DataFormat dataFormat = endpoint.getDataFormat(); + + BindingOperationInfo boi = cxfExchange.getBindingOperationInfo(); + // make sure the "boi" is remained as wrapped in PAYLOAD mode + if (boi != null && dataFormat == DataFormat.PAYLOAD && boi.isUnwrapped()) { + boi = boi.getWrappedOperation(); + cxfExchange.put(BindingOperationInfo.class, boi); + } + + if (boi != null) { + camelExchange.setProperty(BindingOperationInfo.class.getName(), boi); + LOG.trace("Set exchange property: BindingOperationInfo: {}", boi); + // set the message exchange patter with the boi + if (boi.getOperationInfo().isOneWay()) { + camelExchange.setPattern(ExchangePattern.InOnly); + } + } else { + if (cxfEndpoint.getExchangePattern().equals(ExchangePattern.InOnly)) { + camelExchange.setPattern(ExchangePattern.InOnly); + } + } + + + // set data format mode in Camel exchange + camelExchange.setProperty(CxfConstants.DATA_FORMAT_PROPERTY, dataFormat); + LOG.trace("Set Exchange property: {}={}", DataFormat.class.getName(), dataFormat); + + camelExchange.setProperty(Message.MTOM_ENABLED, String.valueOf(endpoint.isMtomEnabled())); + + if (endpoint.getMergeProtocolHeaders()) { + camelExchange.setProperty(CxfConstants.CAMEL_CXF_PROTOCOL_HEADERS_MERGED, Boolean.TRUE); + } + // bind the CXF request into a Camel exchange + binding.populateExchangeFromCxfRequest(cxfExchange, camelExchange); + // extract the javax.xml.ws header + Map<String, Object> context = new HashMap<String, Object>(); + binding.extractJaxWsContext(cxfExchange, context); + // put the context into camelExchange + camelExchange.setProperty(CxfConstants.JAXWS_CONTEXT, context); + + // we want to handle the UoW + try { + CxfConsumer.this.createUoW(camelExchange); + } catch (Exception e) { + log.error("Error processing request", e); + throw new Fault(e); + } + return camelExchange; + } + + @SuppressWarnings("unchecked") + private void setResponseBack(Exchange cxfExchange, org.apache.camel.Exchange camelExchange) { + CxfEndpoint endpoint = (CxfEndpoint)getEndpoint(); + CxfBinding binding = endpoint.getCxfBinding(); + + checkFailure(camelExchange, cxfExchange); + + binding.populateCxfResponseFromExchange(camelExchange, cxfExchange); + + // check failure again as fault could be discovered by converter + checkFailure(camelExchange, cxfExchange); + + // copy the headers javax.xml.ws header back + binding.copyJaxWsContext(cxfExchange, (Map<String, Object>)camelExchange.getProperty(CxfConstants.JAXWS_CONTEXT)); + } + + private void checkFailure(org.apache.camel.Exchange camelExchange, Exchange cxfExchange) throws Fault { + final Throwable t; + if (camelExchange.isFailed()) { + org.apache.camel.Message camelMsg = camelExchange.hasOut() ? camelExchange.getOut() : camelExchange.getIn(); + if (camelMsg.isFault()) { + t = camelMsg.getBody(Throwable.class); + } else { + t = camelExchange.getException(); + } + cxfExchange.getInMessage().put(FaultMode.class, FaultMode.UNCHECKED_APPLICATION_FAULT); + if (t instanceof Fault) { + cxfExchange.getInMessage().put(FaultMode.class, FaultMode.CHECKED_APPLICATION_FAULT); + throw (Fault)t; + } else if (t != null) { + // This is not a CXF Fault. Build the CXF Fault manually. + Fault fault = new Fault(t); + if (fault.getMessage() == null) { + // The Fault has no Message. This is the case if it has + // no message, for example was a NullPointerException. + fault.setMessage(t.getClass().getSimpleName()); + } + WebFault faultAnnotation = t.getClass().getAnnotation(WebFault.class); + Object faultInfo = null; + try { + Method method = t.getClass().getMethod("getFaultInfo"); + faultInfo = method.invoke(t, new Object[0]); + } catch (Exception e) { + // do nothing here + } + if (faultAnnotation != null && faultInfo == null) { + // t has a JAX-WS WebFault annotation, which describes + // in detail the Web Service Fault that should be thrown. Add the + // detail. + Element detail = fault.getOrCreateDetail(); + Element faultDetails = detail.getOwnerDocument() + .createElementNS(faultAnnotation.targetNamespace(), faultAnnotation.name()); + detail.appendChild(faultDetails); + } + + throw fault; + } + + } + } + + } } http://git-wip-us.apache.org/repos/asf/camel/blob/63f64c7d/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfPayloadConsumerDuplicateNamespaceStreamCacheTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfPayloadConsumerDuplicateNamespaceStreamCacheTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfPayloadConsumerDuplicateNamespaceStreamCacheTest.java index ace11a7..3e258c8 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfPayloadConsumerDuplicateNamespaceStreamCacheTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfPayloadConsumerDuplicateNamespaceStreamCacheTest.java @@ -25,7 +25,7 @@ public class CxfPayloadConsumerDuplicateNamespaceStreamCacheTest extends CxfPayl * be an invalid XML. */ protected static final String REQUEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xs=\"http://www.w3.org/2001/XMLSchema\">" - + "<soap:Body><ns2:getToken xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:ns2=\"http://camel.apache.org/cxf/namespace\"><arg0 xsi:type=\"xs:string\">Send</arg0></ns2:getToken></soap:Body></soap:Envelope>"; + + "<soap:Body><ns2:getToken xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:ns2=\"http://camel.apache.org/cxf/namespace\"><arg0 xsi:type=\"xs:string\">Send</arg0></ns2:getToken></soap:Body></soap:Envelope>"; @Test public void testInvokeRouter() { http://git-wip-us.apache.org/repos/asf/camel/blob/63f64c7d/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpCommonEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpCommonEndpoint.java b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpCommonEndpoint.java index 40e6d5b..83bf92c 100644 --- a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpCommonEndpoint.java +++ b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpCommonEndpoint.java @@ -47,9 +47,9 @@ public abstract class HttpCommonEndpoint extends DefaultEndpoint implements Head + " You may also set the option throwExceptionOnFailure to be false to let the HttpProducer send all the fault response back.") boolean bridgeEndpoint; @UriParam(label = "producer", - description = "If the option is true, HttpProducer will set the Host header to the value contained in the current exchange Host header, " + - "useful in reverse proxy applications where you want the Host header received by the downstream server to reflect the URL called by the upstream client, " + - "this allows applications which use the Host header to generate accurate URL's for a proxied service") + description = "If the option is true, HttpProducer will set the Host header to the value contained in the current exchange Host header, " + + "useful in reverse proxy applications where you want the Host header received by the downstream server to reflect the URL called by the upstream client, " + + "this allows applications which use the Host header to generate accurate URL's for a proxied service") boolean preserveHostHeader; @UriParam(label = "consumer", description = "Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found.")