Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430 923162280 -> a30ec69a7


ignite-430 IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a30ec69a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a30ec69a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a30ec69a

Branch: refs/heads/ignite-430
Commit: a30ec69a7c99862d423ba0bc67226cd3ef213fe8
Parents: 9231622
Author: Andrey Gura <ag...@gridgain.com>
Authored: Tue Mar 24 21:09:21 2015 +0300
Committer: Andrey Gura <ag...@gridgain.com>
Committed: Tue Mar 24 21:09:21 2015 +0300

----------------------------------------------------------------------
 .../streaming/SocketObjectStreamerExample.java  | 130 ------------------
 .../streaming/SocketStreamerExample.java        | 129 +++++++++++++++++
 .../streaming/SocketTextStreamerExample.java    | 125 -----------------
 .../streaming/TextSocketStreamerExample.java    | 137 +++++++++++++++++++
 .../ignite/streaming/IgniteSocketStreamer.java  | 112 ++++++---------
 .../streaming/IgniteTextSocketStreamer.java     |  61 +++++++++
 .../ignite/streaming/ObjectStreamConverter.java |  92 -------------
 .../ignite/streaming/TextStreamConverter.java   |  79 -----------
 .../streaming/IgniteSocketStreamerTest.java     | 113 +++++++++++++++
 .../streaming/IgniteTextSocketStreamerTest.java | 102 ++++++++++++++
 10 files changed, 586 insertions(+), 494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketObjectStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketObjectStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketObjectStreamerExample.java
deleted file mode 100644
index 3a72071..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketObjectStreamerExample.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.datagrid.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streaming.*;
-
-import java.io.*;
-import java.net.*;
-
-/**
- * Demonstrates how cache can be populated with data utilizing {@link 
IgniteSocketStreamer} API.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-cache.xml'}.
- * <p>
- * Alternatively you can run {@link CacheNodeStartup} in another JVM which will
- * start node with {@code examples/config/example-cache.xml} configuration.
- */
-public class SocketObjectStreamerExample {
-    /** Cache name. */
-    private static final String CACHE_NAME = "partitioned";
-
-    /** Number of entries to load. */
-    private static final int ENTRY_COUNT = 500000;
-
-    /** Heap size required to run this example. */
-    public static final int MIN_MEMORY = 512 * 1024 * 1024;
-
-    /** Streaming server host. */
-    private static final String HOST = "localhost";
-
-    /** Streaming server port. */
-    private static final int PORT = 5555;
-
-    /**
-     * Executes example.
-     *
-     * @param args Command line arguments, none required.
-     * @throws IgniteException If example execution failed.
-     */
-    public static void main(String[] args) throws IgniteException {
-        ExamplesUtils.checkMinMemory(MIN_MEMORY);
-
-        try (Ignite ignite = 
Ignition.start("examples/config/example-cache.xml")) {
-            System.out.println();
-            System.out.println(">>> Cache data streamer example started.");
-
-            startServer();
-
-            // Clean up caches on all nodes before run.
-            ignite.jcache(CACHE_NAME).clear();
-
-            System.out.println();
-            System.out.println(">>> Cache clear finished.");
-
-            long start = System.currentTimeMillis();
-
-            try (IgniteDataStreamer<Integer, IgniteBiTuple<Integer, String>> 
stmr = ignite.dataStreamer(CACHE_NAME)) {
-                // Configure loader.
-                stmr.perNodeBufferSize(1024);
-                stmr.perNodeParallelOperations(8);
-
-                IgniteClosure<IgniteBiTuple<Integer, String>, Integer> keyClos 
=
-                        new IgniteClosure<IgniteBiTuple<Integer, String>, 
Integer>() {
-                    @Override public Integer apply(IgniteBiTuple<Integer, 
String> input) {
-                        return input.getKey();
-                    }
-                };
-
-                IgniteSocketStreamer<Integer, IgniteBiTuple<Integer, String>> 
sockStmr =
-                        new IgniteSocketStreamer<>(HOST, PORT, stmr, new 
ObjectStreamConverter<>(keyClos));
-
-                sockStmr.loadData();
-            }
-
-            long end = System.currentTimeMillis();
-
-            System.out.println(">>> Cache Size " + 
ignite.jcache(CACHE_NAME).size(CachePeekMode.PRIMARY));
-
-            System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + 
(end - start) + "ms.");
-        }
-    }
-
-    /**
-     * Starts streaming server and writes data into socket.
-     */
-    private static void startServer() {
-        new Thread() {
-            @Override public void run() {
-                System.out.println();
-                System.out.println(">>> Streaming server thread is started.");
-
-                try (ServerSocket srvSock = new ServerSocket(PORT);
-                     Socket sock = srvSock.accept();
-                     ObjectOutputStream oos =
-                             new ObjectOutputStream(new 
BufferedOutputStream(sock.getOutputStream()))) {
-
-                    for (int i = 0; i < ENTRY_COUNT; i++)
-                        oos.writeObject(new IgniteBiTuple<>(i, 
Integer.toString(i)));
-                }
-                catch (IOException e) {
-                    // No-op.
-                }
-
-                System.out.println();
-                System.out.println(">>> Streaming server thread is finished.");
-            }
-        }.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
new file mode 100644
index 0000000..234ef76
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.datagrid.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.streaming.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Demonstrates how cache can be populated with data utilizing {@link 
IgniteSocketStreamer} API.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-cache.xml'}.
+ * <p>
+ * Alternatively you can run {@link CacheNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-cache.xml} configuration.
+ */
+public class SocketStreamerExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = "partitioned";
+
+    /** Number of entries to load. */
+    private static final int ENTRY_COUNT = 500000;
+
+    /** Heap size required to run this example. */
+    public static final int MIN_MEMORY = 512 * 1024 * 1024;
+
+    /** Streaming server host. */
+    private static final String HOST = "localhost";
+
+    /** Streaming server port. */
+    private static final int PORT = 5555;
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws IgniteException {
+        ExamplesUtils.checkMinMemory(MIN_MEMORY);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-cache.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache data streamer example started.");
+
+            startServer();
+
+            // Clean up caches on all nodes before run.
+            ignite.jcache(CACHE_NAME).clear();
+
+            System.out.println();
+            System.out.println(">>> Cache clear finished.");
+
+            long start = System.currentTimeMillis();
+
+            try (IgniteDataStreamer<Integer, String> stmr = 
ignite.dataStreamer(CACHE_NAME)) {
+                // Configure loader.
+                stmr.perNodeBufferSize(1024);
+                stmr.perNodeParallelOperations(8);
+
+                IgniteClosure<IgniteBiTuple<Integer, String>, 
Map.Entry<Integer, String>> converter =
+                    new IgniteClosure<IgniteBiTuple<Integer, String>, 
Map.Entry<Integer, String>>() {
+                        @Override public Map.Entry<Integer, String> 
apply(IgniteBiTuple<Integer, String> input) {
+                            return new IgniteBiTuple<>(input.getKey(), 
input.getValue());
+                        }
+                    };
+
+                IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, 
String> sockStmr =
+                    new IgniteSocketStreamer<>(HOST, PORT, stmr, converter);
+
+                sockStmr.loadData();
+            }
+
+            long end = System.currentTimeMillis();
+
+            System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + 
(end - start) + "ms.");
+        }
+    }
+
+    /**
+     * Starts streaming server and writes data into socket.
+     */
+    private static void startServer() {
+        new Thread() {
+            @Override public void run() {
+                System.out.println();
+                System.out.println(">>> Streaming server thread is started.");
+
+                try (ServerSocket srvSock = new ServerSocket(PORT);
+                     Socket sock = srvSock.accept();
+                     ObjectOutputStream oos =
+                         new ObjectOutputStream(new 
BufferedOutputStream(sock.getOutputStream()))) {
+
+                    for (int i = 0; i < ENTRY_COUNT; i++)
+                        oos.writeObject(new IgniteBiTuple<>(i, 
Integer.toString(i)));
+                }
+                catch (IOException e) {
+                    // No-op.
+                }
+
+                System.out.println();
+                System.out.println(">>> Streaming server thread is finished.");
+            }
+        }.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketTextStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketTextStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketTextStreamerExample.java
deleted file mode 100644
index 9a2094f..0000000
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketTextStreamerExample.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.datagrid.*;
-import org.apache.ignite.streaming.*;
-
-import java.io.*;
-import java.net.*;
-
-/**
- * Demonstrates how cache can be populated with data utilizing {@link 
IgniteSocketStreamer} API.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-cache.xml'}.
- * <p>
- * Alternatively you can run {@link CacheNodeStartup} in another JVM which will
- * start node with {@code examples/config/example-cache.xml} configuration.
- */
-public class SocketTextStreamerExample {
-    /** Cache name. */
-    private static final String CACHE_NAME = "partitioned";
-
-    /** Number of entries to load. */
-    private static final int ENTRY_COUNT = 500000;
-
-    /** Heap size required to run this example. */
-    public static final int MIN_MEMORY = 512 * 1024 * 1024;
-
-    /** Streaming server host. */
-    private static final String HOST = "localhost";
-
-    /** Streaming server port. */
-    private static final int PORT = 5555;
-
-    /**
-     * Executes example.
-     *
-     * @param args Command line arguments, none required.
-     * @throws IgniteException If example execution failed.
-     */
-    public static void main(String[] args) throws IgniteException {
-        ExamplesUtils.checkMinMemory(MIN_MEMORY);
-
-        try (Ignite ignite = 
Ignition.start("examples/config/example-cache.xml")) {
-            System.out.println();
-            System.out.println(">>> Cache data streamer example started.");
-
-            startServer();
-
-            // Clean up caches on all nodes before run.
-            ignite.jcache(CACHE_NAME).clear();
-
-            System.out.println();
-            System.out.println(">>> Cache clear finished.");
-
-            long start = System.currentTimeMillis();
-
-            try (IgniteDataStreamer<String, String> stmr = 
ignite.dataStreamer(CACHE_NAME)) {
-                // Configure loader.
-                stmr.perNodeBufferSize(1024);
-                stmr.perNodeParallelOperations(8);
-
-                IgniteSocketStreamer<String, String> sockStmr =
-                        new IgniteSocketStreamer<>(HOST, PORT, stmr, new 
TextStreamConverter());
-
-                sockStmr.loadData();
-            }
-
-            long end = System.currentTimeMillis();
-
-            System.out.println(">>> Cache Size " + 
ignite.jcache(CACHE_NAME).size(CachePeekMode.PRIMARY));
-
-            System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + 
(end - start) + "ms.");
-        }
-    }
-
-    /**
-     * Starts streaming server and writes data into socket.
-     */
-    private static void startServer() {
-        new Thread() {
-            @Override public void run() {
-                System.out.println();
-                System.out.println(">>> Streaming server thread is started.");
-
-                try (ServerSocket srvSock = new ServerSocket(PORT);
-                     Socket sock = srvSock.accept();
-                     BufferedWriter writer =
-                             new BufferedWriter(new 
OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) {
-
-                    for (int i = 0; i < ENTRY_COUNT; i++) {
-                        String num = Integer.toString(i);
-                        writer.write(num + '=' + num);
-                        writer.newLine();
-                    }
-                }
-                catch (IOException e) {
-                    // No-op.
-                }
-
-                System.out.println();
-                System.out.println(">>> Streaming server thread is finished.");
-            }
-        }.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
new file mode 100644
index 0000000..d6c6daa
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.datagrid.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.streaming.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Demonstrates how cache can be populated with data utilizing {@link 
IgniteTextSocketStreamer} API.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-cache.xml'}.
+ * <p>
+ * Alternatively you can run {@link CacheNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-cache.xml} configuration.
+ */
+public class TextSocketStreamerExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = "partitioned";
+
+    /** Number of entries to load. */
+    private static final int ENTRY_COUNT = 500000;
+
+    /** Heap size required to run this example. */
+    public static final int MIN_MEMORY = 512 * 1024 * 1024;
+
+    /** Streaming server host. */
+    private static final String HOST = "localhost";
+
+    /** Streaming server port. */
+    private static final int PORT = 5555;
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws IgniteException {
+        ExamplesUtils.checkMinMemory(MIN_MEMORY);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-cache.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache data streamer example started.");
+
+            startServer();
+
+            // Clean up caches on all nodes before run.
+            ignite.jcache(CACHE_NAME).clear();
+
+            System.out.println();
+            System.out.println(">>> Cache clear finished.");
+
+            long start = System.currentTimeMillis();
+
+            try (IgniteDataStreamer<Integer, String> stmr = 
ignite.dataStreamer(CACHE_NAME)) {
+                // Configure loader.
+                stmr.perNodeBufferSize(1024);
+                stmr.perNodeParallelOperations(8);
+
+                IgniteClosure<String, Map.Entry<Integer, String>> converter =
+                    new IgniteClosure<String, Map.Entry<Integer, String>>() {
+                        @Override public Map.Entry<Integer, String> 
apply(String input) {
+                            String[] pair = input.split("=");
+                            return new 
IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]);
+                        }
+                };
+
+                IgniteTextSocketStreamer<Integer, String> sockStmr =
+                    new IgniteTextSocketStreamer<>(HOST, PORT, stmr, 
converter);
+
+                sockStmr.loadData();
+            }
+
+            long end = System.currentTimeMillis();
+
+            System.out.println(">>> Cache Size " + 
ignite.jcache(CACHE_NAME).size(CachePeekMode.PRIMARY));
+
+            System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + 
(end - start) + "ms.");
+        }
+    }
+
+    /**
+     * Starts streaming server and writes data into socket.
+     */
+    private static void startServer() {
+        new Thread() {
+            @Override public void run() {
+                System.out.println();
+                System.out.println(">>> Streaming server thread is started.");
+
+                try (ServerSocket srvSock = new ServerSocket(PORT);
+                     Socket sock = srvSock.accept();
+                     BufferedWriter writer =
+                         new BufferedWriter(new 
OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) {
+
+                    for (int i = 0; i < ENTRY_COUNT; i++) {
+                        String num = Integer.toString(i);
+
+                        writer.write(num + '=' + num);
+
+                        writer.newLine();
+                    }
+                }
+                catch (IOException e) {
+                    // No-op.
+                }
+
+                System.out.println();
+                System.out.println(">>> Streaming server thread is finished.");
+            }
+        }.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
index e122b66..54dcdce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.streaming;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 
@@ -25,109 +26,84 @@ import java.io.*;
 import java.net.*;
 import java.util.*;
 
-public class IgniteSocketStreamer<K, V> {
-    /** Target streamer. */
-    private final IgniteDataStreamer<K, V> streamer;
-
-    /** Stream to entries iterator transformer. */
-    private final IgniteClosure<InputStream, Iterator<Map.Entry<K, V>>> f;
-
+/**
+ * Data streamer is responsible for streaming data from socket into cache. 
Every object obtained from socket converts
+ * to key-value pair using converter.
+ *
+ * @param <E> Type of element obtained from socket.
+ * @param <K> Cache entry key type.
+ * @param <V> Cache entry value type.
+ */
+public class IgniteSocketStreamer<E, K, V> {
     /** Host. */
     private final String host;
 
     /** Port. */
     private final int port;
 
+    /** Target streamer. */
+    protected final IgniteDataStreamer<K, V> streamer;
+
+    /** Stream to entries iterator transformer. */
+    protected final IgniteClosure<E, Map.Entry<K, V>> converter;
+
     /**
      * Constructs socket streamer.
      *
      * @param host Host.
      * @param port Port.
      * @param streamer Streamer.
-     * @param f Stream to entries iterator transformer.
+     * @param converter Stream to entry converter.
      */
     public IgniteSocketStreamer(
-            String host,
-            int port,
-            IgniteDataStreamer<K, V> streamer,
-            IgniteClosure<InputStream, Iterator<Map.Entry<K, V>>> f
-            ) {
-
+        String host,
+        int port,
+        IgniteDataStreamer<K, V> streamer,
+        IgniteClosure<E, Map.Entry<K, V>> converter
+    ) {
         A.notNull(streamer, "streamer is null");
         A.notNull(host, "host is null");
-        A.notNull(f, "f is null");
+        A.notNull(converter, "converter is null");
 
         this.host = host;
         this.port = port;
         this.streamer = streamer;
-        this.f = f;
+        this.converter = converter;
     }
 
     /**
      * Performs loading of data stream.
      */
     public void loadData() {
-        try (Socket sock = new Socket(host, port);
-             InputStream is = sock.getInputStream()) {
-
-            for (Iterator<Map.Entry<K, V>> it = f.apply(is); it.hasNext();)
-                streamer.addData(it.next());
+        try (Socket sock = new Socket(host, port)) {
+            loadData(sock);
         }
-        catch (IOException e) {
+        catch (Exception e) {
             throw new IgniteException(e);
         }
     }
 
     /**
-     * Base iterator implementation with next element pre-fetching.
+     * Reads data from socket and loads them into target data stream.
      *
-     * @param <E> Element type.
+     * @param sock Socket.
      */
-    abstract static class NextIterator<E> implements Iterator<E>, 
AutoCloseable {
-        /** Closed. */
-        private boolean closed;
-        /** Next value. */
-        private E next;
-        /** Next value is available. */
-        private boolean gotNext;
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            if (!closed && !gotNext) {
-                next = getNext();
-
-                gotNext = true;
+    @SuppressWarnings("unchecked")
+    protected void loadData(Socket sock) throws IOException {
+        try (ObjectInputStream ois = new ObjectInputStream(new 
BufferedInputStream(sock.getInputStream()))) {
+            while (true) {
+                try {
+                    E element = (E) ois.readObject();
+
+                    streamer.addData(converter.apply(element));
+                }
+                catch (EOFException e) {
+                    break;
+                }
+                catch (IOException | ClassNotFoundException e) {
+                    throw new IgniteException(e);
+                }
             }
-
-            return !closed;
-        }
-
-        /** {@inheritDoc} */
-        @Override public E next() {
-            if (!hasNext())
-                throw new NoSuchElementException("End of stream");
-
-            gotNext = false;
-
-            return next;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        /**
-         * Gets next value from backed iterator. If next element isn't 
available the derived class should
-         * invoke {@link #close()} method and return any value.
-         *
-         * @return Next element or closes the iterator.
-         */
-        protected abstract E getNext();
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            closed = true;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
new file mode 100644
index 0000000..6faaeb6
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Data streamer is responsible for streaming data from socket into cache. 
Every object obtained from socket treats as
+ * {@link String} instance and converts to key-value pair using converter.
+ *
+ * @param <K> Cache entry key type.
+ * @param <V> Cache entry value type.
+ */
+public class IgniteTextSocketStreamer<K, V> extends 
IgniteSocketStreamer<String, K, V> {
+    /**
+     * Constructs text socket streamer.
+     *
+     * @param host Host.
+     * @param port Port.
+     * @param streamer Streamer.
+     * @param converter Stream to entries converter.
+     */
+    public IgniteTextSocketStreamer(
+        String host,
+        int port,
+        IgniteDataStreamer<K, V> streamer,
+        IgniteClosure<String, Map.Entry<K, V>> converter
+    ) {
+        super(host, port, streamer, converter);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void loadData(Socket sock) throws IOException {
+        try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(sock.getInputStream(), "UTF-8"))) {
+            String val;
+
+            while ((val = reader.readLine()) != null)
+                streamer.addData(converter.apply(val));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/main/java/org/apache/ignite/streaming/ObjectStreamConverter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/ObjectStreamConverter.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/ObjectStreamConverter.java
deleted file mode 100644
index 32dac48..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/streaming/ObjectStreamConverter.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- *
- */
-public class ObjectStreamConverter<K, V> implements IgniteClosure<InputStream, 
Iterator<Map.Entry<K, V>>> {
-    /** Key closure. */
-    private final IgniteClosure<V, K> keyClos;
-
-    public ObjectStreamConverter(IgniteClosure<V, K> keyClos) {
-        this.keyClos = keyClos;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<Map.Entry<K, V>> apply(final InputStream is) {
-
-        final IgniteSocketStreamer.NextIterator<V> it;
-
-        try {
-            it = new NextObjectIterator<>(is);
-        }
-        catch (IOException e) {
-            throw new IgniteException(e);
-        }
-
-        return new Iterator<Map.Entry<K, V>>() {
-            @Override public boolean hasNext() {
-                return it.hasNext();
-            }
-
-            @Override public Map.Entry<K, V> next() {
-                V next = it.next();
-
-                return new IgniteBiTuple<>(keyClos.apply(next), next);
-            }
-
-            @Override public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    private static class NextObjectIterator<E> extends 
IgniteSocketStreamer.NextIterator<E> {
-        /** Object input stream. */
-        private final ObjectInputStream ois;
-
-        public NextObjectIterator(InputStream is) throws IOException {
-            this.ois = new ObjectInputStream(new BufferedInputStream(is));
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override protected E getNext() {
-            try {
-                E obj = null;
-
-                try {
-                    obj = (E) ois.readObject();
-                } catch (EOFException e) {
-                    close();
-                }
-
-                return obj;
-            } catch (IOException | ClassNotFoundException e) {
-                throw new IgniteException(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/main/java/org/apache/ignite/streaming/TextStreamConverter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/TextStreamConverter.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/TextStreamConverter.java
deleted file mode 100644
index ae63edb..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/streaming/TextStreamConverter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-
-import java.io.*;
-import java.util.*;
-
-public class TextStreamConverter implements IgniteClosure<InputStream, 
Iterator<Map.Entry<String, String>>> {
-    /** {@inheritDoc} */
-    @Override public Iterator<Map.Entry<String, String>> apply(final 
InputStream is) {
-
-        final IgniteSocketStreamer.NextIterator<String> it;
-
-        try {
-            it = new NextStringIterator(is);
-        }
-        catch (UnsupportedEncodingException e) {
-            throw new IgniteException(e);
-        }
-
-        return new Iterator<Map.Entry<String, String>>() {
-            @Override public boolean hasNext() {
-                return it.hasNext();
-            }
-
-            @Override public Map.Entry<String, String> next() {
-                String[] pair = it.next().split("=");
-
-                return new IgniteBiTuple<>(pair[0], pair[1]);
-            }
-
-            @Override public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    private static class NextStringIterator extends 
IgniteSocketStreamer.NextIterator<String> {
-        /** Buffered reader. */
-        private final BufferedReader reader;
-
-        public NextStringIterator(InputStream is) throws 
UnsupportedEncodingException {
-            this.reader = new BufferedReader(new InputStreamReader(is, 
"UTF-8"));
-        }
-
-        /** {@inheritDoc} */
-        @Override protected String getNext() {
-            try {
-                String val = reader.readLine();
-
-                if (val == null)
-                    close();
-
-                return val;
-            }
-            catch (Exception e) {
-                throw new IgniteException(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
new file mode 100644
index 0000000..1bb0088
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Test for data loading using {@link IgniteSocketStreamer}.
+ */
+public class IgniteSocketStreamerTest extends GridCommonAbstractTest {
+    /** Host. */
+    private static final String HOST = "localhost";
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Entry count. */
+    private static final int ENTRY_CNT = 50000;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * Tests data loading.
+     */
+    public void testLoadData() throws Exception {
+        try (Ignite g = startGrid()) {
+
+            IgniteCache<Integer, String> cache = g.jcache(null);
+
+            cache.clear();
+
+            try (IgniteDataStreamer<Integer, String> stmr = 
g.dataStreamer(null)) {
+
+                startServer();
+
+                IgniteClosure<IgniteBiTuple<Integer, String>, 
Map.Entry<Integer, String>> converter =
+                    new IgniteClosure<IgniteBiTuple<Integer, String>, 
Map.Entry<Integer, String>>() {
+                        @Override public Map.Entry<Integer, String> 
apply(IgniteBiTuple<Integer, String> input) {
+                            return new IgniteBiTuple<>(input.getKey(), 
input.getValue());
+                        }
+                    };
+
+                IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, 
String> sockStmr =
+                    new IgniteSocketStreamer<>(HOST, PORT, stmr, converter);
+
+                sockStmr.loadData();
+            }
+
+            assertEquals(ENTRY_CNT, cache.size());
+        } finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Starts streaming server and writes data into socket.
+     */
+    private static void startServer() {
+        new Thread() {
+            @Override public void run() {
+                try (ServerSocket srvSock = new ServerSocket(PORT);
+                     Socket sock = srvSock.accept();
+                     ObjectOutputStream oos =
+                         new ObjectOutputStream(new 
BufferedOutputStream(sock.getOutputStream()))) {
+
+                    for (int i = 0; i < ENTRY_CNT; i++)
+                        oos.writeObject(new IgniteBiTuple<>(i, 
Integer.toString(i)));
+                }
+                catch (IOException e) {
+                    // No-op.
+                }
+            }
+        }.start();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
new file mode 100644
index 0000000..34318c8
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
@@ -0,0 +1,102 @@
+package org.apache.ignite.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ * Test for data loading using {@link IgniteTextSocketStreamer}.
+ */
+public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest {
+    /** Host. */
+    private static final String HOST = "localhost";
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Entry count. */
+    private static final int ENTRY_CNT = 50000;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * Tests data loading.
+     */
+    public void testLoadData() throws Exception {
+        try (Ignite g = startGrid()) {
+
+            IgniteCache<Integer, String> cache = g.jcache(null);
+
+            cache.clear();
+
+            try (IgniteDataStreamer<Integer, String> stmr = 
g.dataStreamer(null)) {
+
+                startServer();
+
+                IgniteClosure<String, Map.Entry<Integer, String>> converter =
+                    new IgniteClosure<String, Map.Entry<Integer, String>>() {
+                        @Override public Map.Entry<Integer, String> 
apply(String input) {
+                            String[] pair = input.split("=");
+                            return new 
IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]);
+                        }
+                    };
+
+                IgniteTextSocketStreamer<Integer, String> sockStmr =
+                    new IgniteTextSocketStreamer<>(HOST, PORT, stmr, 
converter);
+
+                sockStmr.loadData();
+            }
+
+            assertEquals(ENTRY_CNT, cache.size());
+        } finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Starts streaming server and writes data into socket.
+     */
+    private static void startServer() {
+        new Thread() {
+            @Override public void run() {
+                try (ServerSocket srvSock = new ServerSocket(PORT);
+                     Socket sock = srvSock.accept();
+                     BufferedWriter writer =
+                         new BufferedWriter(new 
OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) {
+
+                    for (int i = 0; i < ENTRY_CNT; i++) {
+                        String num = Integer.toString(i);
+
+                        writer.write(num + '=' + num);
+
+                        writer.newLine();
+                    }
+                }
+                catch (IOException e) {
+                    // No-op.
+                }
+            }
+        }.start();
+    }
+}
\ No newline at end of file

Reply via email to