Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x de334d0d5 -> 759435a88


CAMEL-9010: camel-stream in scan stream mode should deal if the file is 
temporary not there.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/759435a8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/759435a8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/759435a8

Branch: refs/heads/camel-2.15.x
Commit: 759435a88e7611ddb781416571c6d93d70853007
Parents: de334d0
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Jul 24 10:45:11 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jul 24 10:46:11 2015 +0200

----------------------------------------------------------------------
 .../camel/component/stream/StreamConsumer.java  | 25 ++++++--
 .../stream/ScanStreamFileManualTest.java        | 61 ++++++++++++++++++++
 2 files changed, 81 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/759435a8/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index 5c32a64..f841957 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -68,7 +68,10 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
     protected void doStart() throws Exception {
         super.doStart();
 
-        initializeStream();
+        // if we scan the stream we are lenient and can wait for the stream to 
be available later
+        if (!endpoint.isScanStream()) {
+            initializeStream();
+        }
 
         executor = 
endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
 endpoint.getEndpointUri());
         executor.execute(this);
@@ -115,8 +118,13 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
             inputStream = resolveStreamFromUrl();
             inputStreamToClose = inputStream;
         }
-        Charset charset = endpoint.getCharset();
-        return IOHelper.buffered(new InputStreamReader(inputStream, charset));
+
+        if (inputStream != null) {
+            Charset charset = endpoint.getCharset();
+            return IOHelper.buffered(new InputStreamReader(inputStream, 
charset));
+        } else {
+            return null;
+        }
     }
 
     private void readFromStream() throws Exception {
@@ -127,8 +135,12 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         if (endpoint.isScanStream()) {
             // repeat scanning from stream
             while (isRunAllowed()) {
-                line = br.readLine();
-                LOG.trace("Read line: {}", line);
+                if (br != null) {
+                    line = br.readLine();
+                    LOG.trace("Read line: {}", line);
+                } else {
+                    line = null;
+                }
                 boolean eos = line == null;
                 if (!eos && isRunAllowed()) {
                     index = processLine(line, false, index);
@@ -267,6 +279,9 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
 
         if (file.canRead()) {
             fileStream = new FileInputStream(file);
+        } else if (endpoint.isScanStream()) {
+            // if we scan the stream then it may not be available and we 
should return null
+            fileStream = null;
         } else {
             throw new IllegalArgumentException(INVALID_URI);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/759435a8/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileManualTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileManualTest.java
 
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileManualTest.java
new file mode 100644
index 0000000..f296350
--- /dev/null
+++ 
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileManualTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Unit test for scan stream file
+ */
+@Ignore("For manual testing")
+public class ScanStreamFileManualTest extends CamelTestSupport {
+
+    private File file;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        deleteDirectory("target/stream");
+        createDirectory("target/stream");
+
+        file = new File("target/stream/scanstreamfile.txt");
+        file.createNewFile();
+
+        super.setUp();
+    }
+
+    @Test
+    public void testScanFile() throws Exception {
+        Thread.sleep(60000);
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                
from("stream:file?fileName=target/stream/scanstreamfile.txt&scanStream=true&scanStreamDelay=200&retry=true")
+                        .to("log:line");
+            }
+        };
+    }
+
+}

Reply via email to