Author: slewis Date: Thu Jun 24 19:22:31 2010 New Revision: 957691 URL: http://svn.apache.org/viewvc?rev=957691&view=rev Log: CAMEL-2853 - camel-stream - tailing logfile does not seem to work when logfile is rolled over
Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=957691&r1=957690&r2=957691&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original) +++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Thu Jun 24 19:22:31 2010 @@ -64,13 +64,7 @@ public class StreamConsumer extends Defa protected void doStart() throws Exception { super.doStart(); - if ("in".equals(uri)) { - inputStream = System.in; - } else if ("file".equals(uri)) { - inputStream = resolveStreamFromFile(); - } else if ("url".equals(uri)) { - inputStream = resolveStreamFromUrl(); - } + initializeStream(); executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this, endpoint.getEndpointUri()); executor.execute(this); @@ -95,10 +89,22 @@ public class StreamConsumer extends Defa } } - private void readFromStream() throws Exception { + private BufferedReader initializeStream() throws Exception { + if ("in".equals(uri)) { + inputStream = System.in; + } else if ("file".equals(uri)) { + inputStream = resolveStreamFromFile(); + } else if ("url".equals(uri)) { + inputStream = resolveStreamFromUrl(); + } Charset charset = endpoint.getCharset(); BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, charset)); + return br; + } + + private void readFromStream() throws Exception { String line; + BufferedReader br = initializeStream(); if (endpoint.isScanStream()) { // repeat scanning from stream @@ -110,6 +116,9 @@ public class StreamConsumer extends Defa boolean eos = line == null; if (!eos && isRunAllowed()) { processLine(line); + } else if (eos && isRunAllowed()) { + //try and re-open stream + br = initializeStream(); } try { Thread.sleep(endpoint.getScanStreamDelay()); Modified: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java?rev=957691&r1=957690&r2=957691&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java (original) +++ camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Thu Jun 24 19:22:31 2010 @@ -60,6 +60,37 @@ public class ScanStreamFileTest extends fos.close(); } + @Test + public void testScanRefreshedFile() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello", "there", "World", "!"); + + FileOutputStream fos = refreshFile(null); + fos.write("Hello\n".getBytes()); + Thread.sleep(150); + fos.write("there\n".getBytes()); + fos = refreshFile(fos); + Thread.sleep(150); + fos.write("World\n".getBytes()); + Thread.sleep(150); + fos = refreshFile(fos); + Thread.sleep(150); + fos.write("!\n".getBytes()); + + assertMockEndpointsSatisfied(); + + fos.close(); + } + + private FileOutputStream refreshFile(FileOutputStream fos) throws Exception { + if (fos != null) { + fos.close(); + } + file.delete(); + file.createNewFile(); + return new FileOutputStream(file); + } + protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { @@ -68,4 +99,4 @@ public class ScanStreamFileTest extends }; } -} \ No newline at end of file +}