Repository: camel Updated Branches: refs/heads/master 044c3ad19 -> 88664983b
CAMEL-9010: camel-stream in scan stream mode should deal if the file is temporary not there. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/88664983 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/88664983 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/88664983 Branch: refs/heads/master Commit: 88664983bfea24a8e741acac6e3343e53b0267c2 Parents: 044c3ad Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 24 10:45:11 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 24 10:45:11 2015 +0200 ---------------------------------------------------------------------- .../camel/component/stream/StreamConsumer.java | 25 ++++++-- .../stream/ScanStreamFileManualTest.java | 61 ++++++++++++++++++++ 2 files changed, 81 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/88664983/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java index 5c32a64..f841957 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java @@ -68,7 +68,10 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { protected void doStart() throws Exception { super.doStart(); - initializeStream(); + // if we scan the stream we are lenient and can wait for the stream to be available later + if (!endpoint.isScanStream()) { + initializeStream(); + } executor = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, endpoint.getEndpointUri()); executor.execute(this); @@ -115,8 +118,13 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { inputStream = resolveStreamFromUrl(); inputStreamToClose = inputStream; } - Charset charset = endpoint.getCharset(); - return IOHelper.buffered(new InputStreamReader(inputStream, charset)); + + if (inputStream != null) { + Charset charset = endpoint.getCharset(); + return IOHelper.buffered(new InputStreamReader(inputStream, charset)); + } else { + return null; + } } private void readFromStream() throws Exception { @@ -127,8 +135,12 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { if (endpoint.isScanStream()) { // repeat scanning from stream while (isRunAllowed()) { - line = br.readLine(); - LOG.trace("Read line: {}", line); + if (br != null) { + line = br.readLine(); + LOG.trace("Read line: {}", line); + } else { + line = null; + } boolean eos = line == null; if (!eos && isRunAllowed()) { index = processLine(line, false, index); @@ -267,6 +279,9 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { if (file.canRead()) { fileStream = new FileInputStream(file); + } else if (endpoint.isScanStream()) { + // if we scan the stream then it may not be available and we should return null + fileStream = null; } else { throw new IllegalArgumentException(INVALID_URI); } http://git-wip-us.apache.org/repos/asf/camel/blob/88664983/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileManualTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileManualTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileManualTest.java new file mode 100644 index 0000000..f296350 --- /dev/null +++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileManualTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stream; + +import java.io.File; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Unit test for scan stream file + */ +@Ignore("For manual testing") +public class ScanStreamFileManualTest extends CamelTestSupport { + + private File file; + + @Override + @Before + public void setUp() throws Exception { + deleteDirectory("target/stream"); + createDirectory("target/stream"); + + file = new File("target/stream/scanstreamfile.txt"); + file.createNewFile(); + + super.setUp(); + } + + @Test + public void testScanFile() throws Exception { + Thread.sleep(60000); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("stream:file?fileName=target/stream/scanstreamfile.txt&scanStream=true&scanStreamDelay=200&retry=true") + .to("log:line"); + } + }; + } + +}