Repository: incubator-ignite Updated Branches: refs/heads/ignite-430-1 262e50393 -> 5f4a5ccae
ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5f4a5cca Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5f4a5cca Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5f4a5cca Branch: refs/heads/ignite-430-1 Commit: 5f4a5ccaebfc00971f3425b129722426810e1d1e Parents: 262e503 Author: agura <ag...@gridgain.com> Authored: Tue Apr 21 00:04:53 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Tue Apr 21 00:04:53 2015 +0300 ---------------------------------------------------------------------- .../streaming/socket/SocketStreamerExample.java | 2 + .../socket/ZStringsSocketStreamerExample.java | 2 + .../internal/util/nio/GridBufferedParser.java | 4 -- .../util/nio/GridNioDelimitedBuffer.java | 41 +++++++------------- .../ignite/stream/adapters/StreamAdapter.java | 17 ++++++++ .../stream/socket/IgniteSocketStreamer.java | 20 +++------- .../socket/IgniteSocketStreamerSelfTest.java | 2 + 7 files changed, 43 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java index 1ee916f..73cb970 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java @@ -89,6 +89,8 @@ public class SocketStreamerExample { sockStmr.setPort(PORT); + sockStmr.setIgnite(ignite); + sockStmr.setStreamer(stmr); sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, Long>() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java index 9fd229e..a535c73 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java @@ -98,6 +98,8 @@ public class ZStringsSocketStreamerExample { sockStmr.setDelimiter(DELIM); + sockStmr.setIgnite(ignite); + sockStmr.setStreamer(stmr); // Converter from zero-terminated string to Java strings. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java index 3f81dc4..a03d2c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.nio.*; @@ -33,9 +32,6 @@ import java.nio.*; * | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE | * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ * </pre> - * <p> - * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If beginning of a stream, - * isn't equal to these bytes than exception will be thrown. */ public class GridBufferedParser implements GridNioParser { /** Buffer metadata key. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java index bbcedf2..230297d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java @@ -26,18 +26,18 @@ import java.util.*; * Buffer with message delimiter support. */ public class GridNioDelimitedBuffer { - /** Buffer size. */ - private static final int BUFFER_SIZE = 512; - /** Delimiter. */ private final byte[] delim; /** Data. */ - private byte[] data; + private byte[] data = new byte[512]; /** Count. */ private int cnt; + /** Index. */ + private int idx; + /** * @param delim Delimiter. */ @@ -55,8 +55,7 @@ public class GridNioDelimitedBuffer { */ private void reset() { cnt = 0; - - data = new byte[BUFFER_SIZE]; + idx = 0; } /** @@ -64,14 +63,20 @@ public class GridNioDelimitedBuffer { * @return Message bytes or {@code null} if message is not fully read yet. */ @Nullable public byte[] read(ByteBuffer buf) { - for(; buf.hasRemaining();) { - + while(buf.hasRemaining()) { if (cnt == data.length) data = Arrays.copyOf(data, data.length * 2); - data[cnt++] = buf.get(); + byte b = buf.get(); - if (cnt >= delim.length && found()) { + data[cnt++] = b; + + if (b == delim[idx]) + idx++; + else + idx = (b == delim[0]) ? 1 : 0; + + if (idx == delim.length) { byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length); reset(); @@ -82,20 +87,4 @@ public class GridNioDelimitedBuffer { return null; } - - /** - * Tries find delimiter in buffer. - * - * @return {@code True} if delimiter found, {@code false} - otherwise. - */ - private boolean found() { - int from = cnt - delim.length; - - for (int i = 0; i < delim.length ; i++) { - if (delim[i] != data[from + i]) - return false; - } - - return true; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java index c729362..b99521a 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java @@ -34,6 +34,9 @@ public abstract class StreamAdapter<T, K, V> { /** Streamer. */ private IgniteDataStreamer<K, V> stmr; + /** Ignite. */ + private Ignite ignite; + /** * Empty constructor. */ @@ -81,6 +84,20 @@ public abstract class StreamAdapter<T, K, V> { } /** + * @return Provided {@link Ignite} instance. + */ + public Ignite getIgnite() { + return ignite; + } + + /** + * @param ignite {@link Ignite} instance. + */ + public void setIgnite(Ignite ignite) { + this.ignite = ignite; + } + + /** * Converts given message to a tuple and adds it to the underlying streamer. * * @param msg Message to convert. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java index f506af3..66369ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java @@ -18,18 +18,15 @@ package org.apache.ignite.stream.socket; import org.apache.ignite.*; -import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.util.nio.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.jdk.*; import org.apache.ignite.stream.adapters.*; - import org.jetbrains.annotations.*; import java.net.*; import java.nio.*; -import java.util.*; /** * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and @@ -132,15 +129,12 @@ public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { * @throws IgniteException If failed. */ public void start() { - A.notNull(getTupleExtractor(), "tupleExtractor is null"); - A.notNull(getStreamer(), "streamer is null"); + A.notNull(getTupleExtractor(), "tupleExtractor"); + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); A.ensure(threads > 0, "threads > 0"); - UUID uuid = ((DataStreamerImpl)getStreamer()).cacheObjectContext().kernalContext().localNodeId(); - - Ignite ignite = Ignition.ignite(uuid); - - log = ignite.log(); + log = getIgnite().log(); GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() { @Override public void onConnected(GridNioSession ses) { @@ -156,11 +150,7 @@ public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { } @Override public void onMessage(GridNioSession ses, byte[] msg) { - T obj = converter.convert(msg); - - Map.Entry<K, V> e = getTupleExtractor().extract(obj); - - getStreamer().addData(e); + addMessage(converter.convert(msg)); } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java index 6d8a9b4..19852ce 100644 --- a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java @@ -244,6 +244,8 @@ public class IgniteSocketStreamerSelfTest extends GridCommonAbstractTest { IgniteCache<Integer, String> cache = ignite.cache(null); + sockStmr.setIgnite(ignite); + sockStmr.setStreamer(stmr); sockStmr.setPort(port);