Repository: camel Updated Branches: refs/heads/master dff401647 -> 4f0feb7a2
Component docs Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d5e4767a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d5e4767a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d5e4767a Branch: refs/heads/master Commit: d5e4767a7f0b1e352623674e4967ae97e11f5464 Parents: dff4016 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Apr 27 09:56:22 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Apr 27 09:56:22 2015 +0200 ---------------------------------------------------------------------- .../component/stream/DefaultGroupStrategy.java | 4 +- .../camel/component/stream/GroupStrategy.java | 3 + .../camel/component/stream/StreamEndpoint.java | 79 ++++++++++++++++---- 3 files changed, 70 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d5e4767a/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java index abf9c48..a2fb8bb 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/DefaultGroupStrategy.java @@ -18,7 +18,9 @@ package org.apache.camel.component.stream; import java.util.List; - +/** + * A default GroupStrategy that just group the lines by returning the lines as a list as-is + */ public class DefaultGroupStrategy implements GroupStrategy { @Override http://git-wip-us.apache.org/repos/asf/camel/blob/d5e4767a/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java index 21096a7..502ddeb 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/GroupStrategy.java @@ -18,6 +18,9 @@ package org.apache.camel.component.stream; import java.util.List; +/** + * Strategy to control how lines should be grouped together. + */ public interface GroupStrategy { Object groupLines(List<String> lines); http://git-wip-us.apache.org/repos/asf/camel/blob/d5e4767a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java index a2c2d16..e1a19ff 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java @@ -35,35 +35,35 @@ import org.slf4j.LoggerFactory; public class StreamEndpoint extends DefaultEndpoint { private static final Logger LOG = LoggerFactory.getLogger(StreamEndpoint.class); + private transient Charset charset; + @UriPath @Metadata(required = "true") private String url; @UriParam private String fileName; - @UriParam + @UriParam(label = "consumer") private boolean scanStream; - @UriParam + @UriParam(label = "consumer") private boolean retry; - @UriParam + @UriParam(label = "producer") private boolean closeOnDone; - @UriParam + @UriParam(label = "consumer") private long scanStreamDelay; - @UriParam + @UriParam(label = "producer") private long delay; @UriParam private String encoding; - @UriParam + @UriParam(label = "consumer") private String promptMessage; - @UriParam + @UriParam(label = "consumer") private long promptDelay; - @UriParam(defaultValue = "2000") + @UriParam(label = "consumer", defaultValue = "2000") private long initialPromptDelay = 2000; - @UriParam + @UriParam(label = "consumer") private int groupLines; - @UriParam + @UriParam(label = "producer") private int autoCloseCount; - @UriParam - private Charset charset; - @UriParam + @UriParam(label = "consumer") private GroupStrategy groupStrategy = new DefaultGroupStrategy(); public StreamEndpoint(String endpointUri, Component component) throws Exception { @@ -104,6 +104,9 @@ public class StreamEndpoint extends DefaultEndpoint { return fileName; } + /** + * When using the stream:file URI format, this option specifies the filename to stream to/from. + */ public void setFileName(String fileName) { this.fileName = fileName; } @@ -112,6 +115,10 @@ public class StreamEndpoint extends DefaultEndpoint { return url; } + /** + * When using the stream:url URI format, this option specifies the URL to stream to/from. + * The input/output stream will be opened using the JDK URLConnection facility. + */ public void setUrl(String url) { this.url = url; } @@ -120,6 +127,9 @@ public class StreamEndpoint extends DefaultEndpoint { return delay; } + /** + * Initial delay in milliseconds before producing the stream. + */ public void setDelay(long delay) { this.delay = delay; } @@ -128,6 +138,10 @@ public class StreamEndpoint extends DefaultEndpoint { return encoding; } + /** + * You can configure the encoding (is a charset name) to use text-based streams (for example, message body is a String object). + * If not provided, Camel uses the JVM default Charset. + */ public void setEncoding(String encoding) { this.encoding = encoding; } @@ -136,6 +150,9 @@ public class StreamEndpoint extends DefaultEndpoint { return promptMessage; } + /** + * Message prompt to use when reading from stream:in; for example, you could set this to Enter a command: + */ public void setPromptMessage(String promptMessage) { this.promptMessage = promptMessage; } @@ -144,6 +161,9 @@ public class StreamEndpoint extends DefaultEndpoint { return promptDelay; } + /** + * Optional delay in milliseconds before showing the message prompt. + */ public void setPromptDelay(long promptDelay) { this.promptDelay = promptDelay; } @@ -152,6 +172,10 @@ public class StreamEndpoint extends DefaultEndpoint { return initialPromptDelay; } + /** + * Initial delay in milliseconds before showing the message prompt. This delay occurs only once. + * Can be used during system startup to avoid message prompts being written while other logging is done to the system out. + */ public void setInitialPromptDelay(long initialPromptDelay) { this.initialPromptDelay = initialPromptDelay; } @@ -160,6 +184,9 @@ public class StreamEndpoint extends DefaultEndpoint { return scanStream; } + /** + * To be used for continuously reading a stream such as the unix tail command. + */ public void setScanStream(boolean scanStream) { this.scanStream = scanStream; } @@ -167,7 +194,10 @@ public class StreamEndpoint extends DefaultEndpoint { public GroupStrategy getGroupStrategy() { return groupStrategy; } - + + /** + * Allows to use a custom GroupStrategy to control how to group lines. + */ public void setGroupStrategy(GroupStrategy strategy) { this.groupStrategy = strategy; } @@ -176,6 +206,9 @@ public class StreamEndpoint extends DefaultEndpoint { return retry; } + /** + * Will retry opening the file if it's overwritten, somewhat like tail --retry + */ public void setRetry(boolean retry) { this.retry = retry; } @@ -183,7 +216,12 @@ public class StreamEndpoint extends DefaultEndpoint { public boolean isCloseOnDone() { return closeOnDone; } - + + /** + * This option is used in combination with Splitter and streaming to the same file. + * The idea is to keep the stream open and only close when the Splitter is done, to improve performance. + * Mind this requires that you only stream to the same file, and not 2 or more files. + */ public void setCloseOnDone(boolean closeOnDone) { this.closeOnDone = closeOnDone; } @@ -192,6 +230,9 @@ public class StreamEndpoint extends DefaultEndpoint { return scanStreamDelay; } + /** + * Delay in milliseconds between read attempts when using scanStream. + */ public void setScanStreamDelay(long scanStreamDelay) { this.scanStreamDelay = scanStreamDelay; } @@ -200,6 +241,10 @@ public class StreamEndpoint extends DefaultEndpoint { return groupLines; } + /** + * To group X number of lines in the consumer. + * For example to group 10 lines and therefore only spit out an Exchange with 10 lines, instead of 1 Exchange per line. + */ public void setGroupLines(int groupLines) { this.groupLines = groupLines; } @@ -208,6 +253,10 @@ public class StreamEndpoint extends DefaultEndpoint { return autoCloseCount; } + /** + * Number of messages to process before closing stream on Producer side. + * Never close stream by default (only when Producer is stopped). If more messages are sent, the stream is reopened for another autoCloseCount batch. + */ public void setAutoCloseCount(int autoCloseCount) { this.autoCloseCount = autoCloseCount; }