Author: ningjiang
Date: Mon Feb 20 14:02:01 2012
New Revision: 1291263

URL: http://svn.apache.org/viewvc?rev=1291263&view=rev
Log:
CAMEL-5017 Added a GroupStrategy API to the camel-stream component

Added:
    
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java
    
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java
    
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
Modified:
    
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
    
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java

Added: 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java?rev=1291263&view=auto
==============================================================================
--- 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java
 (added)
+++ 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java
 Mon Feb 20 14:02:01 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import java.util.List;
+
+
+public class DefaultGroupStrategy implements GroupStrategy {
+
+    @Override
+    public Object groupLines(List<String> lines) {
+        return lines;
+    }
+
+}

Added: 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java?rev=1291263&view=auto
==============================================================================
--- 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java
 (added)
+++ 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java
 Mon Feb 20 14:02:01 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import java.util.List;
+
+public interface GroupStrategy {
+    
+    Object groupLines(List<String> lines);
+
+}

Modified: 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=1291263&r1=1291262&r2=1291263&view=diff
==============================================================================
--- 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 (original)
+++ 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 Mon Feb 20 14:02:01 2012
@@ -57,7 +57,7 @@ public class StreamConsumer extends Defa
     private StreamEndpoint endpoint;
     private String uri;
     private boolean initialPromptDone;
-    private final List<Object> lines = new CopyOnWriteArrayList<Object>();
+    private final List<String> lines = new CopyOnWriteArrayList<String>();
 
     public StreamConsumer(StreamEndpoint endpoint, Processor processor, String 
uri) throws Exception {
         super(endpoint, processor);
@@ -166,7 +166,7 @@ public class StreamConsumer extends Defa
     /**
      * Strategy method for processing the line
      */
-    protected synchronized void processLine(Object line) throws Exception {
+    protected synchronized void processLine(String line) throws Exception {
         if (endpoint.getGroupLines() > 0) {
             // remember line
             lines.add(line);
@@ -178,8 +178,8 @@ public class StreamConsumer extends Defa
 
                 // create message with the lines
                 Message msg = new DefaultMessage();
-                List<Object> copy = new ArrayList<Object>(lines);
-                msg.setBody(copy);
+                List<String> copy = new ArrayList<String>(lines);
+                msg.setBody(endpoint.getGroupStrategy().groupLines(copy));
                 exchange.setIn(msg);
 
                 // clear lines

Modified: 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=1291263&r1=1291262&r2=1291263&view=diff
==============================================================================
--- 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
 Mon Feb 20 14:02:01 2012
@@ -41,6 +41,7 @@ public class StreamEndpoint extends Defa
     private long initialPromptDelay = 2000;
     private int groupLines;
     private Charset charset;
+    private GroupStrategy groupStrategy = new DefaultGroupStrategy();
 
     public StreamEndpoint(String endpointUri, Component component) throws 
Exception {
         super(endpointUri, component);
@@ -129,6 +130,14 @@ public class StreamEndpoint extends Defa
     public void setScanStream(boolean scanStream) {
         this.scanStream = scanStream;
     }
+    
+    public GroupStrategy getGroupStrategy() {
+        return groupStrategy;
+    }
+    
+    public void setGroupStrategy(GroupStrategy strategy) {
+        this.groupStrategy = strategy;
+    }
 
     public boolean isRetry() {
         return retry;

Added: 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java?rev=1291263&view=auto
==============================================================================
--- 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
 (added)
+++ 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
 Mon Feb 20 14:02:01 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import java.util.List;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class StreamGroupLinesStrategyTest extends StreamGroupLinesTest {
+    
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myGroupStrategy", new MyGroupStrategy());
+        return jndi;
+    }
+    
+    class MyGroupStrategy implements GroupStrategy {
+
+        @Override
+        public Object groupLines(List<String> lines) {
+            StringBuffer buffer = new StringBuffer();
+            for (String line : lines) {
+                buffer.append(line);
+                buffer.append("\n");
+            }
+            return buffer.toString();
+        }
+    }
+    
+    @Test
+    public void testGroupLines() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+
+        assertMockEndpointsSatisfied();
+
+        Object result = mock.getExchanges().get(0).getIn().getBody();
+        assertEquals("Get a wrong result.", "A\nB\nC\n", result);
+        
+
+        Object result2 = mock.getExchanges().get(1).getIn().getBody();
+        assertEquals("Get a wrong result.", "D\nE\nF\n", result2);
+        
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("stream:file?fileName=target/stream/streamfile.txt&groupLines=3&groupStrategy=#myGroupStrategy").to("mock:result");
+            }
+        };
+    }
+
+}


Reply via email to