Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430-1 262e50393 -> 5f4a5ccae


ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5f4a5cca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5f4a5cca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5f4a5cca

Branch: refs/heads/ignite-430-1
Commit: 5f4a5ccaebfc00971f3425b129722426810e1d1e
Parents: 262e503
Author: agura <ag...@gridgain.com>
Authored: Tue Apr 21 00:04:53 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Tue Apr 21 00:04:53 2015 +0300

----------------------------------------------------------------------
 .../streaming/socket/SocketStreamerExample.java |  2 +
 .../socket/ZStringsSocketStreamerExample.java   |  2 +
 .../internal/util/nio/GridBufferedParser.java   |  4 --
 .../util/nio/GridNioDelimitedBuffer.java        | 41 +++++++-------------
 .../ignite/stream/adapters/StreamAdapter.java   | 17 ++++++++
 .../stream/socket/IgniteSocketStreamer.java     | 20 +++-------
 .../socket/IgniteSocketStreamerSelfTest.java    |  2 +
 7 files changed, 43 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
index 1ee916f..73cb970 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
@@ -89,6 +89,8 @@ public class SocketStreamerExample {
 
                 sockStmr.setPort(PORT);
 
+                sockStmr.setIgnite(ignite);
+
                 sockStmr.setStreamer(stmr);
 
                 sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, 
Integer, Long>() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
index 9fd229e..a535c73 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
@@ -98,6 +98,8 @@ public class ZStringsSocketStreamerExample {
 
                 sockStmr.setDelimiter(DELIM);
 
+                sockStmr.setIgnite(ignite);
+
                 sockStmr.setStreamer(stmr);
 
                 // Converter from zero-terminated string to Java strings.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
index 3f81dc4..a03d2c8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
 import java.nio.*;
@@ -33,9 +32,6 @@ import java.nio.*;
  *     | MSG_SIZE  |   MESSAGE  | MSG_SIZE  |   MESSAGE  |
  *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
  * </pre>
- * <p>
- * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If 
beginning of a stream,
- * isn't equal to these bytes than exception will be thrown.
  */
 public class GridBufferedParser implements GridNioParser {
     /** Buffer metadata key. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
index bbcedf2..230297d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
@@ -26,18 +26,18 @@ import java.util.*;
  * Buffer with message delimiter support.
  */
 public class GridNioDelimitedBuffer {
-    /** Buffer size. */
-    private static final int BUFFER_SIZE = 512;
-
     /** Delimiter. */
     private final byte[] delim;
 
     /** Data. */
-    private byte[] data;
+    private byte[] data = new byte[512];
 
     /** Count. */
     private int cnt;
 
+    /** Index. */
+    private int idx;
+
     /**
      * @param delim Delimiter.
      */
@@ -55,8 +55,7 @@ public class GridNioDelimitedBuffer {
      */
     private void reset() {
         cnt = 0;
-
-        data = new byte[BUFFER_SIZE];
+        idx = 0;
     }
 
     /**
@@ -64,14 +63,20 @@ public class GridNioDelimitedBuffer {
      * @return Message bytes or {@code null} if message is not fully read yet.
      */
     @Nullable public byte[] read(ByteBuffer buf) {
-        for(; buf.hasRemaining();) {
-
+        while(buf.hasRemaining()) {
             if (cnt == data.length)
                 data = Arrays.copyOf(data, data.length * 2);
 
-            data[cnt++] = buf.get();
+            byte b = buf.get();
 
-            if (cnt >= delim.length && found()) {
+            data[cnt++] = b;
+
+            if (b == delim[idx])
+                idx++;
+            else
+                idx = (b == delim[0]) ? 1 : 0;
+
+            if (idx == delim.length) {
                 byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length);
 
                 reset();
@@ -82,20 +87,4 @@ public class GridNioDelimitedBuffer {
 
         return null;
     }
-
-    /**
-     * Tries find delimiter in buffer.
-     *
-     * @return {@code True} if delimiter found, {@code false} - otherwise.
-     */
-    private boolean found() {
-        int from = cnt - delim.length;
-
-        for (int i = 0; i < delim.length ; i++) {
-            if (delim[i] != data[from + i])
-                return false;
-        }
-
-        return true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
index c729362..b99521a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -34,6 +34,9 @@ public abstract class StreamAdapter<T, K, V> {
     /** Streamer. */
     private IgniteDataStreamer<K, V> stmr;
 
+    /** Ignite. */
+    private Ignite ignite;
+
     /**
      * Empty constructor.
      */
@@ -81,6 +84,20 @@ public abstract class StreamAdapter<T, K, V> {
     }
 
     /**
+     * @return Provided {@link Ignite} instance.
+     */
+    public Ignite getIgnite() {
+        return ignite;
+    }
+
+    /**
+     * @param ignite {@link Ignite} instance.
+     */
+    public void setIgnite(Ignite ignite) {
+        this.ignite = ignite;
+    }
+
+    /**
      * Converts given message to a tuple and adds it to the underlying 
streamer.
      *
      * @param msg Message to convert.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
index f506af3..66369ea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
@@ -18,18 +18,15 @@
 package org.apache.ignite.stream.socket;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.util.nio.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.marshaller.jdk.*;
 import org.apache.ignite.stream.adapters.*;
-
 import org.jetbrains.annotations.*;
 
 import java.net.*;
 import java.nio.*;
-import java.util.*;
 
 /**
  * Server that receives data from TCP socket, converts it to key-value pairs 
using {@link StreamTupleExtractor} and
@@ -132,15 +129,12 @@ public class IgniteSocketStreamer<T, K, V> extends 
StreamAdapter<T, K, V> {
      * @throws IgniteException If failed.
      */
     public void start() {
-        A.notNull(getTupleExtractor(), "tupleExtractor is null");
-        A.notNull(getStreamer(), "streamer is null");
+        A.notNull(getTupleExtractor(), "tupleExtractor");
+        A.notNull(getStreamer(), "streamer");
+        A.notNull(getIgnite(), "ignite");
         A.ensure(threads > 0, "threads > 0");
 
-        UUID uuid = 
((DataStreamerImpl)getStreamer()).cacheObjectContext().kernalContext().localNodeId();
-
-        Ignite ignite = Ignition.ignite(uuid);
-
-        log = ignite.log();
+        log = getIgnite().log();
 
         GridNioServerListener<byte[]> lsnr = new 
GridNioServerListenerAdapter<byte[]>() {
             @Override public void onConnected(GridNioSession ses) {
@@ -156,11 +150,7 @@ public class IgniteSocketStreamer<T, K, V> extends 
StreamAdapter<T, K, V> {
             }
 
             @Override public void onMessage(GridNioSession ses, byte[] msg) {
-                T obj = converter.convert(msg);
-
-                Map.Entry<K, V> e = getTupleExtractor().extract(obj);
-
-                getStreamer().addData(e);
+                addMessage(converter.convert(msg));
             }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f4a5cca/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
index 6d8a9b4..19852ce 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
@@ -244,6 +244,8 @@ public class IgniteSocketStreamerSelfTest extends 
GridCommonAbstractTest {
 
             IgniteCache<Integer, String> cache = ignite.cache(null);
 
+            sockStmr.setIgnite(ignite);
+
             sockStmr.setStreamer(stmr);
 
             sockStmr.setPort(port);

Reply via email to