Updated Branches:
  refs/heads/master 90e08bb30 -> 323008c50

CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to 
configure and allow 3rd party to plugin custom strategies. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0f254e82
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0f254e82
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0f254e82

Branch: refs/heads/master
Commit: 0f254e823bf70baf4ca0f10c8c6524619f12425b
Parents: 90e08bb
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Jul 20 11:57:06 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Jul 20 12:10:03 2013 +0200

----------------------------------------------------------------------
 .../camel/processor/CamelInternalProcessor.java |  9 ++++-
 .../processor/interceptor/DefaultChannel.java   |  7 +---
 .../StreamCachingResetProcessor.java            | 40 --------------------
 .../org/apache/camel/util/MessageHelper.java    |  5 ++-
 4 files changed, 11 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0f254e82/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index ff7f8f5..b93496e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -737,8 +737,13 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
         public StreamCache before(Exchange exchange) throws Exception {
             // check if body is already cached
             Object body = exchange.getIn().getBody();
-            if (body == null || body instanceof StreamCache) {
-                return (StreamCache) body;
+            if (body == null) {
+                return null;
+            } else if (body instanceof StreamCache) {
+                StreamCache sc = (StreamCache) body;
+                // reset so the cache is ready to be used before processing
+                sc.reset();
+                return sc;
             }
             // cache the body and if we could do that replace it as the new 
body
             StreamCache sc = strategy.cache(exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/0f254e82/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index 1ca4064..6af5e96 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -191,12 +191,7 @@ public class DefaultChannel extends CamelInternalProcessor 
implements ModelChann
         // force the creation of an id
         RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), 
definition);
 
-        // first wrap with stream caching reset
-        if (routeContext.isStreamCaching()) {
-            target = new StreamCachingResetProcessor(target);
-        }
-
-        // the wrap the output with the managed strategy if any
+        // first wrap the output with the managed strategy if any
         InterceptStrategy managed = routeContext.getManagedInterceptStrategy();
         if (managed != null) {
             next = target == nextProcessor ? null : nextProcessor;

http://git-wip-us.apache.org/repos/asf/camel/blob/0f254e82/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
deleted file mode 100644
index 57b5c96..0000000
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingResetProcessor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor.interceptor;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.processor.DelegateAsyncProcessor;
-import org.apache.camel.util.MessageHelper;
-
-/**
- * {@link Processor} to reset {@link org.apache.camel.StreamCache} to ensure 
the stream
- * is ready and re-readable for processing.
- */
-public class StreamCachingResetProcessor extends DelegateAsyncProcessor {
-
-    public StreamCachingResetProcessor(Processor processor) {
-        super(processor);
-    }
-
-    @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        MessageHelper.resetStreamCache(exchange.getIn());
-        return super.process(exchange, callback);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/0f254e82/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
index d27418f..3e38d23 100644
--- a/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
@@ -114,8 +114,9 @@ public final class MessageHelper {
         if (message == null) {
             return;
         }
-        if (message.getBody() instanceof StreamCache) {
-            ((StreamCache)message.getBody()).reset();
+        Object body = message.getBody();
+        if (body != null && body instanceof StreamCache) {
+            ((StreamCache) body).reset();
         }
     }
 

Reply via email to