Author: ningjiang Date: Mon Feb 20 14:02:01 2012 New Revision: 1291263 URL: http://svn.apache.org/viewvc?rev=1291263&view=rev Log: CAMEL-5017 Added a GroupStrategy API to the camel-stream component
Added: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Added: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java?rev=1291263&view=auto ============================================================================== --- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java (added) +++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java Mon Feb 20 14:02:01 2012 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stream; + +import java.util.List; + + +public class DefaultGroupStrategy implements GroupStrategy { + + @Override + public Object groupLines(List<String> lines) { + return lines; + } + +} Added: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java?rev=1291263&view=auto ============================================================================== --- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java (added) +++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java Mon Feb 20 14:02:01 2012 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stream; + +import java.util.List; + +public interface GroupStrategy { + + Object groupLines(List<String> lines); + +} Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=1291263&r1=1291262&r2=1291263&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original) +++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Mon Feb 20 14:02:01 2012 @@ -57,7 +57,7 @@ public class StreamConsumer extends Defa private StreamEndpoint endpoint; private String uri; private boolean initialPromptDone; - private final List<Object> lines = new CopyOnWriteArrayList<Object>(); + private final List<String> lines = new CopyOnWriteArrayList<String>(); public StreamConsumer(StreamEndpoint endpoint, Processor processor, String uri) throws Exception { super(endpoint, processor); @@ -166,7 +166,7 @@ public class StreamConsumer extends Defa /** * Strategy method for processing the line */ - protected synchronized void processLine(Object line) throws Exception { + protected synchronized void processLine(String line) throws Exception { if (endpoint.getGroupLines() > 0) { // remember line lines.add(line); @@ -178,8 +178,8 @@ public class StreamConsumer extends Defa // create message with the lines Message msg = new DefaultMessage(); - List<Object> copy = new ArrayList<Object>(lines); - msg.setBody(copy); + List<String> copy = new ArrayList<String>(lines); + msg.setBody(endpoint.getGroupStrategy().groupLines(copy)); exchange.setIn(msg); // clear lines Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=1291263&r1=1291262&r2=1291263&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java (original) +++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Mon Feb 20 14:02:01 2012 @@ -41,6 +41,7 @@ public class StreamEndpoint extends Defa private long initialPromptDelay = 2000; private int groupLines; private Charset charset; + private GroupStrategy groupStrategy = new DefaultGroupStrategy(); public StreamEndpoint(String endpointUri, Component component) throws Exception { super(endpointUri, component); @@ -129,6 +130,14 @@ public class StreamEndpoint extends Defa public void setScanStream(boolean scanStream) { this.scanStream = scanStream; } + + public GroupStrategy getGroupStrategy() { + return groupStrategy; + } + + public void setGroupStrategy(GroupStrategy strategy) { + this.groupStrategy = strategy; + } public boolean isRetry() { return retry; Added: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java?rev=1291263&view=auto ============================================================================== --- camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java (added) +++ camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java Mon Feb 20 14:02:01 2012 @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stream; + +import java.util.List; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +public class StreamGroupLinesStrategyTest extends StreamGroupLinesTest { + + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("myGroupStrategy", new MyGroupStrategy()); + return jndi; + } + + class MyGroupStrategy implements GroupStrategy { + + @Override + public Object groupLines(List<String> lines) { + StringBuffer buffer = new StringBuffer(); + for (String line : lines) { + buffer.append(line); + buffer.append("\n"); + } + return buffer.toString(); + } + } + + @Test + public void testGroupLines() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(2); + + assertMockEndpointsSatisfied(); + + Object result = mock.getExchanges().get(0).getIn().getBody(); + assertEquals("Get a wrong result.", "A\nB\nC\n", result); + + + Object result2 = mock.getExchanges().get(1).getIn().getBody(); + assertEquals("Get a wrong result.", "D\nE\nF\n", result2); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("stream:file?fileName=target/stream/streamfile.txt&groupLines=3&groupStrategy=#myGroupStrategy").to("mock:result"); + } + }; + } + +}