Repository: camel Updated Branches: refs/heads/master 0437f7db6 -> 92becd2a9
[CAMEL-9821]add mep uri param for camel-cxf endpoint Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92becd2a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92becd2a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92becd2a Branch: refs/heads/master Commit: 92becd2a94dc0135dee0b92c8caff31eb51dae31 Parents: 0437f7d Author: Freeman Fang <freeman.f...@gmail.com> Authored: Wed Apr 6 10:34:45 2016 +0800 Committer: Freeman Fang <freeman.f...@gmail.com> Committed: Wed Apr 6 10:34:45 2016 +0800 ---------------------------------------------------------------------- .../apache/camel/component/cxf/CxfConsumer.java | 5 ++ .../apache/camel/component/cxf/CxfEndpoint.java | 19 +++++++ .../cxf/feature/RAWDataFormatFeature.java | 26 ++++++++++ .../OneWayOutgoingChainInterceptor.java | 52 ++++++++++++++++++++ 4 files changed, 102 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java index fab7267..5b32832 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java @@ -172,7 +172,12 @@ public class CxfConsumer extends DefaultConsumer { if (boi.getOperationInfo().isOneWay()) { camelExchange.setPattern(ExchangePattern.InOnly); } + } else { + if (cxfEndpoint.getMep() != null && cxfEndpoint.getMep().equals("InOnly")) { + camelExchange.setPattern(ExchangePattern.InOnly); + } } + // set data format mode in Camel exchange camelExchange.setProperty(CxfConstants.DATA_FORMAT_PROPERTY, dataFormat); http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java index 8dd5442..12b9ce0 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java @@ -192,6 +192,8 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy private String password; @UriParam(label = "advanced", prefix = "properties.", multiValue = true) private Map<String, Object> properties; + @UriParam(name = "mep") + private String mep; public CxfEndpoint() { } @@ -309,6 +311,11 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy sfb.setDataBinding(new SourceDataBinding()); } else if (getDataFormat().dealias() == DataFormat.RAW) { RAWDataFormatFeature feature = new RAWDataFormatFeature(); + if (this.getMep() != null && this.getMep().equals("InOnly")) { + //if DataFormat is RAW|MESSAGE, can't read message so can't + //determine it's oneway so need get the MEP from URI explicitly + feature.setOneway(true); + } feature.addInIntercepters(getInInterceptors()); feature.addOutInterceptors(getOutInterceptors()); sfb.getFeatures().add(feature); @@ -1120,6 +1127,18 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy this.username = username; } + public String getMep() { + return mep; + } + + /** + * The Message Exchange Pattern + */ + public void setMep(String mep) { + this.mep = mep; + } + + /** * We need to override the {@link ClientImpl#setParameters} method * to insert parameters into CXF Message for {@link DataFormat#PAYLOAD} mode. http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java index ead2489..22f67fc 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java @@ -17,12 +17,16 @@ package org.apache.camel.component.cxf.feature; +import org.apache.camel.component.cxf.interceptors.OneWayOutgoingChainInterceptor; import org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor; import org.apache.camel.component.cxf.interceptors.RawMessageWSDLGetInterceptor; import org.apache.cxf.Bus; import org.apache.cxf.endpoint.Client; import org.apache.cxf.endpoint.Server; +import org.apache.cxf.interceptor.Interceptor; import org.apache.cxf.interceptor.LoggingOutInterceptor; +import org.apache.cxf.interceptor.OneWayProcessorInterceptor; +import org.apache.cxf.message.Message; import org.apache.cxf.phase.Phase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +53,8 @@ public class RAWDataFormatFeature extends AbstractDataFormatFeature { // filter the unused in phase interceptor private static final String[] REMAINING_OUT_PHASES = {Phase.PREPARE_SEND, Phase.USER_STREAM, Phase.WRITE, Phase.SEND, Phase.PREPARE_SEND_ENDING}; + + private boolean oneway; @Override public void initialize(Client client, Bus bus) { @@ -89,12 +95,32 @@ public class RAWDataFormatFeature extends AbstractDataFormatFeature { // setup the RawMessageWSDLGetInterceptor server.getEndpoint().getInInterceptors().add(RawMessageWSDLGetInterceptor.INSTANCE); + // Oneway with RAW message + if (isOneway()) { + Interceptor<? extends Message> toRemove = null; + for (Interceptor<? extends Message> i : server.getEndpoint().getService().getInInterceptors()) { + if (i.getClass().getName().equals("org.apache.cxf.interceptor.OutgoingChainInterceptor")) { + toRemove = i; + } + } + server.getEndpoint().getService().getInInterceptors().remove(toRemove); + server.getEndpoint().getInInterceptors().add(new OneWayOutgoingChainInterceptor()); + server.getEndpoint().getInInterceptors().add(new OneWayProcessorInterceptor()); + } } @Override protected Logger getLogger() { return LOG; } + + public boolean isOneway() { + return oneway; + } + + public void setOneway(boolean oneway) { + this.oneway = oneway; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java new file mode 100644 index 0000000..f586847 --- /dev/null +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.cxf.interceptors; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.cxf.interceptor.OutgoingChainInterceptor; +import org.apache.cxf.message.Message; +import org.apache.cxf.phase.AbstractPhaseInterceptor; +import org.apache.cxf.phase.Phase; + + +public class OneWayOutgoingChainInterceptor extends AbstractPhaseInterceptor<Message> { + + public OneWayOutgoingChainInterceptor() { + super(Phase.POST_INVOKE); + this.addBefore(OutgoingChainInterceptor.class.getName()); + } + + public void handleMessage(Message message) { + closeInput(message); + return; + } + + private void closeInput(Message message) { + InputStream is = message.getContent(InputStream.class); + if (is != null) { + try { + is.close(); + message.removeContent(InputStream.class); + } catch (IOException ioex) { + //ignore + } + } + } +}