Updated Branches: refs/heads/master 90e08bb30 -> 323008c50
CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0f254e82 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0f254e82 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0f254e82 Branch: refs/heads/master Commit: 0f254e823bf70baf4ca0f10c8c6524619f12425b Parents: 90e08bb Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Jul 20 11:57:06 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Jul 20 12:10:03 2013 +0200 ---------------------------------------------------------------------- .../camel/processor/CamelInternalProcessor.java | 9 ++++- .../processor/interceptor/DefaultChannel.java | 7 +--- .../StreamCachingResetProcessor.java | 40 -------------------- .../org/apache/camel/util/MessageHelper.java | 5 ++- 4 files changed, 11 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0f254e82/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index ff7f8f5..b93496e 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -737,8 +737,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { public StreamCache before(Exchange exchange) throws Exception { // check if body is already cached Object body = exchange.getIn().getBody(); - if (body == null || body instanceof StreamCache) { - return (StreamCache) body; + if (body == null) { + return null; + } else if (body instanceof StreamCache) { + StreamCache sc = (StreamCache) body; + // reset so the cache is ready to be used before processing + sc.reset(); + return sc; } // cache the body and if we could do that replace it as the new body StreamCache sc = strategy.cache(exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/0f254e82/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index 1ca4064..6af5e96 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -191,12 +191,7 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann // force the creation of an id RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), definition); - // first wrap with stream caching reset - if (routeContext.isStreamCaching()) { - target = new StreamCachingResetProcessor(target); - } - - // the wrap the output with the managed strategy if any + // first wrap the output with the managed strategy if any InterceptStrategy managed = routeContext.getManagedInterceptStrategy(); if (managed != null) { next = target == nextProcessor ? null : nextProcessor; http://git-wip-us.apache.org/repos/asf/camel/blob/0f254e82/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java deleted file mode 100644 index 57b5c96..0000000 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.processor.interceptor; - -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.processor.DelegateAsyncProcessor; -import org.apache.camel.util.MessageHelper; - -/** - * {@link Processor} to reset {@link org.apache.camel.StreamCache} to ensure the stream - * is ready and re-readable for processing. - */ -public class StreamCachingResetProcessor extends DelegateAsyncProcessor { - - public StreamCachingResetProcessor(Processor processor) { - super(processor); - } - - @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - MessageHelper.resetStreamCache(exchange.getIn()); - return super.process(exchange, callback); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/0f254e82/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java b/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java index d27418f..3e38d23 100644 --- a/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java @@ -114,8 +114,9 @@ public final class MessageHelper { if (message == null) { return; } - if (message.getBody() instanceof StreamCache) { - ((StreamCache)message.getBody()).reset(); + Object body = message.getBody(); + if (body != null && body instanceof StreamCache) { + ((StreamCache) body).reset(); } }