Author: ningjiang
Date: Tue Mar 12 08:02:53 2013
New Revision: 1455432

URL: http://svn.apache.org/r1455432
Log:
CAMEL-6155, CAMEL-6147 improve the stream:file to be able to auto close when it 
detects the splitter is done

Added:
    
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/SplitStreamFileTest.java
Modified:
    
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
    
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java

Modified: 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=1455432&r1=1455431&r2=1455432&view=diff
==============================================================================
--- 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
 Tue Mar 12 08:02:53 2013
@@ -34,6 +34,7 @@ public class StreamEndpoint extends Defa
     private String fileName;
     private boolean scanStream;
     private boolean retry;
+    private boolean closeOnDone;
     private long scanStreamDelay;
     private long delay;
     private String encoding;
@@ -149,6 +150,14 @@ public class StreamEndpoint extends Defa
     public void setRetry(boolean retry) {
         this.retry = retry;
     }
+    
+    public boolean isCloseOnDone() {
+        return closeOnDone;
+    }
+    
+    public void setCloseOnDone(boolean closeOnDone) {
+        this.closeOnDone = closeOnDone;
+    }
 
     public long getScanStreamDelay() {
         return scanStreamDelay;

Modified: 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=1455432&r1=1455431&r2=1455432&view=diff
==============================================================================
--- 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
 (original)
+++ 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
 Tue Mar 12 08:02:53 2013
@@ -67,7 +67,7 @@ public class StreamProducer extends Defa
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        closeStream(true);
+        closeStream(null, true);
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -76,7 +76,7 @@ public class StreamProducer extends Defa
         synchronized (this) {
             openStream(exchange);
             writeToStream(outputStream, exchange);
-            closeStream(false);
+            closeStream(exchange, false);
         }
     }
 
@@ -172,18 +172,23 @@ public class StreamProducer extends Defa
             openStream();
         }
     }
+    
+    private Boolean isDone(Exchange exchange) {
+        return exchange != null && 
exchange.getProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE, Boolean.class);
+    }
 
-    private synchronized void closeStream(boolean force) throws Exception {
+    private synchronized void closeStream(Exchange exchange, boolean force) 
throws Exception {
         if (outputStream == null) {
             return;
         }
 
         // never close a standard stream (system.out or system.err)
         // always close a 'header' stream (unless it's a system stream)
-        boolean systemStream = outputStream != System.out || outputStream != 
System.err;
+        boolean systemStream = outputStream == System.out || outputStream == 
System.err;
         boolean headerStream = "header".equals(uri);
         boolean reachedLimit = endpoint.getAutoCloseCount() > 0 && 
count.decrementAndGet() <= 0;
-        boolean expiredStream = force || headerStream || reachedLimit;  // 
evaluation order is important!
+        boolean isDone = endpoint.isCloseOnDone() && isDone(exchange);
+        boolean expiredStream = force || headerStream || isDone || 
reachedLimit;  // evaluation order is important!
 
         // never ever close a system stream
         if (!systemStream && expiredStream) {

Added: 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/SplitStreamFileTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/SplitStreamFileTest.java?rev=1455432&view=auto
==============================================================================
--- 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/SplitStreamFileTest.java
 (added)
+++ 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/SplitStreamFileTest.java
 Tue Mar 12 08:02:53 2013
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SplitStreamFileTest extends CamelTestSupport {
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        deleteDirectory("target/stream");
+        createDirectory("target/stream");
+
+        super.setUp();
+    }
+
+    @Test
+    public void testCloseFileOnDone() throws Exception {
+        template.sendBody("direct:start", "A,B,C,D");
+        template.sendBody("direct:start", "A,B");
+        
+        File file = new File("target/stream/splitFile.txt");
+        String result = IOConverter
+                .toString(file, new DefaultExchange(context));
+        assertEquals("Get a wrong result", "A\nB\nC\nD\nA\nB\n", result);
+
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                        .split(body().tokenize(","))
+                        
.to("stream:file?fileName=target/stream/splitFile.txt&closeOnDone=false");
+            }
+        };
+    }
+
+}


Reply via email to