Repository: incubator-ignite Updated Branches: refs/heads/ignite-430 923162280 -> a30ec69a7
ignite-430 IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a30ec69a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a30ec69a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a30ec69a Branch: refs/heads/ignite-430 Commit: a30ec69a7c99862d423ba0bc67226cd3ef213fe8 Parents: 9231622 Author: Andrey Gura <ag...@gridgain.com> Authored: Tue Mar 24 21:09:21 2015 +0300 Committer: Andrey Gura <ag...@gridgain.com> Committed: Tue Mar 24 21:09:21 2015 +0300 ---------------------------------------------------------------------- .../streaming/SocketObjectStreamerExample.java | 130 ------------------ .../streaming/SocketStreamerExample.java | 129 +++++++++++++++++ .../streaming/SocketTextStreamerExample.java | 125 ----------------- .../streaming/TextSocketStreamerExample.java | 137 +++++++++++++++++++ .../ignite/streaming/IgniteSocketStreamer.java | 112 ++++++--------- .../streaming/IgniteTextSocketStreamer.java | 61 +++++++++ .../ignite/streaming/ObjectStreamConverter.java | 92 ------------- .../ignite/streaming/TextStreamConverter.java | 79 ----------- .../streaming/IgniteSocketStreamerTest.java | 113 +++++++++++++++ .../streaming/IgniteTextSocketStreamerTest.java | 102 ++++++++++++++ 10 files changed, 586 insertions(+), 494 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketObjectStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketObjectStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketObjectStreamerExample.java deleted file mode 100644 index 3a72071..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketObjectStreamerExample.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.examples.datagrid.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streaming.*; - -import java.io.*; -import java.net.*; - -/** - * Demonstrates how cache can be populated with data utilizing {@link IgniteSocketStreamer} API. - * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. - * <p> - * Alternatively you can run {@link CacheNodeStartup} in another JVM which will - * start node with {@code examples/config/example-cache.xml} configuration. - */ -public class SocketObjectStreamerExample { - /** Cache name. */ - private static final String CACHE_NAME = "partitioned"; - - /** Number of entries to load. */ - private static final int ENTRY_COUNT = 500000; - - /** Heap size required to run this example. */ - public static final int MIN_MEMORY = 512 * 1024 * 1024; - - /** Streaming server host. */ - private static final String HOST = "localhost"; - - /** Streaming server port. */ - private static final int PORT = 5555; - - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws IgniteException { - ExamplesUtils.checkMinMemory(MIN_MEMORY); - - try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { - System.out.println(); - System.out.println(">>> Cache data streamer example started."); - - startServer(); - - // Clean up caches on all nodes before run. - ignite.jcache(CACHE_NAME).clear(); - - System.out.println(); - System.out.println(">>> Cache clear finished."); - - long start = System.currentTimeMillis(); - - try (IgniteDataStreamer<Integer, IgniteBiTuple<Integer, String>> stmr = ignite.dataStreamer(CACHE_NAME)) { - // Configure loader. - stmr.perNodeBufferSize(1024); - stmr.perNodeParallelOperations(8); - - IgniteClosure<IgniteBiTuple<Integer, String>, Integer> keyClos = - new IgniteClosure<IgniteBiTuple<Integer, String>, Integer>() { - @Override public Integer apply(IgniteBiTuple<Integer, String> input) { - return input.getKey(); - } - }; - - IgniteSocketStreamer<Integer, IgniteBiTuple<Integer, String>> sockStmr = - new IgniteSocketStreamer<>(HOST, PORT, stmr, new ObjectStreamConverter<>(keyClos)); - - sockStmr.loadData(); - } - - long end = System.currentTimeMillis(); - - System.out.println(">>> Cache Size " + ignite.jcache(CACHE_NAME).size(CachePeekMode.PRIMARY)); - - System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms."); - } - } - - /** - * Starts streaming server and writes data into socket. - */ - private static void startServer() { - new Thread() { - @Override public void run() { - System.out.println(); - System.out.println(">>> Streaming server thread is started."); - - try (ServerSocket srvSock = new ServerSocket(PORT); - Socket sock = srvSock.accept(); - ObjectOutputStream oos = - new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) { - - for (int i = 0; i < ENTRY_COUNT; i++) - oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i))); - } - catch (IOException e) { - // No-op. - } - - System.out.println(); - System.out.println(">>> Streaming server thread is finished."); - } - }.start(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java new file mode 100644 index 0000000..234ef76 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.datagrid.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.streaming.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Demonstrates how cache can be populated with data utilizing {@link IgniteSocketStreamer} API. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. + * <p> + * Alternatively you can run {@link CacheNodeStartup} in another JVM which will + * start node with {@code examples/config/example-cache.xml} configuration. + */ +public class SocketStreamerExample { + /** Cache name. */ + private static final String CACHE_NAME = "partitioned"; + + /** Number of entries to load. */ + private static final int ENTRY_COUNT = 500000; + + /** Heap size required to run this example. */ + public static final int MIN_MEMORY = 512 * 1024 * 1024; + + /** Streaming server host. */ + private static final String HOST = "localhost"; + + /** Streaming server port. */ + private static final int PORT = 5555; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + ExamplesUtils.checkMinMemory(MIN_MEMORY); + + try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { + System.out.println(); + System.out.println(">>> Cache data streamer example started."); + + startServer(); + + // Clean up caches on all nodes before run. + ignite.jcache(CACHE_NAME).clear(); + + System.out.println(); + System.out.println(">>> Cache clear finished."); + + long start = System.currentTimeMillis(); + + try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(CACHE_NAME)) { + // Configure loader. + stmr.perNodeBufferSize(1024); + stmr.perNodeParallelOperations(8); + + IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>> converter = + new IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>>() { + @Override public Map.Entry<Integer, String> apply(IgniteBiTuple<Integer, String> input) { + return new IgniteBiTuple<>(input.getKey(), input.getValue()); + } + }; + + IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = + new IgniteSocketStreamer<>(HOST, PORT, stmr, converter); + + sockStmr.loadData(); + } + + long end = System.currentTimeMillis(); + + System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms."); + } + } + + /** + * Starts streaming server and writes data into socket. + */ + private static void startServer() { + new Thread() { + @Override public void run() { + System.out.println(); + System.out.println(">>> Streaming server thread is started."); + + try (ServerSocket srvSock = new ServerSocket(PORT); + Socket sock = srvSock.accept(); + ObjectOutputStream oos = + new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) { + + for (int i = 0; i < ENTRY_COUNT; i++) + oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i))); + } + catch (IOException e) { + // No-op. + } + + System.out.println(); + System.out.println(">>> Streaming server thread is finished."); + } + }.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketTextStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketTextStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketTextStreamerExample.java deleted file mode 100644 index 9a2094f..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketTextStreamerExample.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.examples.datagrid.*; -import org.apache.ignite.streaming.*; - -import java.io.*; -import java.net.*; - -/** - * Demonstrates how cache can be populated with data utilizing {@link IgniteSocketStreamer} API. - * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. - * <p> - * Alternatively you can run {@link CacheNodeStartup} in another JVM which will - * start node with {@code examples/config/example-cache.xml} configuration. - */ -public class SocketTextStreamerExample { - /** Cache name. */ - private static final String CACHE_NAME = "partitioned"; - - /** Number of entries to load. */ - private static final int ENTRY_COUNT = 500000; - - /** Heap size required to run this example. */ - public static final int MIN_MEMORY = 512 * 1024 * 1024; - - /** Streaming server host. */ - private static final String HOST = "localhost"; - - /** Streaming server port. */ - private static final int PORT = 5555; - - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws IgniteException { - ExamplesUtils.checkMinMemory(MIN_MEMORY); - - try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { - System.out.println(); - System.out.println(">>> Cache data streamer example started."); - - startServer(); - - // Clean up caches on all nodes before run. - ignite.jcache(CACHE_NAME).clear(); - - System.out.println(); - System.out.println(">>> Cache clear finished."); - - long start = System.currentTimeMillis(); - - try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(CACHE_NAME)) { - // Configure loader. - stmr.perNodeBufferSize(1024); - stmr.perNodeParallelOperations(8); - - IgniteSocketStreamer<String, String> sockStmr = - new IgniteSocketStreamer<>(HOST, PORT, stmr, new TextStreamConverter()); - - sockStmr.loadData(); - } - - long end = System.currentTimeMillis(); - - System.out.println(">>> Cache Size " + ignite.jcache(CACHE_NAME).size(CachePeekMode.PRIMARY)); - - System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms."); - } - } - - /** - * Starts streaming server and writes data into socket. - */ - private static void startServer() { - new Thread() { - @Override public void run() { - System.out.println(); - System.out.println(">>> Streaming server thread is started."); - - try (ServerSocket srvSock = new ServerSocket(PORT); - Socket sock = srvSock.accept(); - BufferedWriter writer = - new BufferedWriter(new OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) { - - for (int i = 0; i < ENTRY_COUNT; i++) { - String num = Integer.toString(i); - writer.write(num + '=' + num); - writer.newLine(); - } - } - catch (IOException e) { - // No-op. - } - - System.out.println(); - System.out.println(">>> Streaming server thread is finished."); - } - }.start(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java new file mode 100644 index 0000000..d6c6daa --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.datagrid.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.streaming.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Demonstrates how cache can be populated with data utilizing {@link IgniteTextSocketStreamer} API. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. + * <p> + * Alternatively you can run {@link CacheNodeStartup} in another JVM which will + * start node with {@code examples/config/example-cache.xml} configuration. + */ +public class TextSocketStreamerExample { + /** Cache name. */ + private static final String CACHE_NAME = "partitioned"; + + /** Number of entries to load. */ + private static final int ENTRY_COUNT = 500000; + + /** Heap size required to run this example. */ + public static final int MIN_MEMORY = 512 * 1024 * 1024; + + /** Streaming server host. */ + private static final String HOST = "localhost"; + + /** Streaming server port. */ + private static final int PORT = 5555; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + ExamplesUtils.checkMinMemory(MIN_MEMORY); + + try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { + System.out.println(); + System.out.println(">>> Cache data streamer example started."); + + startServer(); + + // Clean up caches on all nodes before run. + ignite.jcache(CACHE_NAME).clear(); + + System.out.println(); + System.out.println(">>> Cache clear finished."); + + long start = System.currentTimeMillis(); + + try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(CACHE_NAME)) { + // Configure loader. + stmr.perNodeBufferSize(1024); + stmr.perNodeParallelOperations(8); + + IgniteClosure<String, Map.Entry<Integer, String>> converter = + new IgniteClosure<String, Map.Entry<Integer, String>>() { + @Override public Map.Entry<Integer, String> apply(String input) { + String[] pair = input.split("="); + return new IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]); + } + }; + + IgniteTextSocketStreamer<Integer, String> sockStmr = + new IgniteTextSocketStreamer<>(HOST, PORT, stmr, converter); + + sockStmr.loadData(); + } + + long end = System.currentTimeMillis(); + + System.out.println(">>> Cache Size " + ignite.jcache(CACHE_NAME).size(CachePeekMode.PRIMARY)); + + System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms."); + } + } + + /** + * Starts streaming server and writes data into socket. + */ + private static void startServer() { + new Thread() { + @Override public void run() { + System.out.println(); + System.out.println(">>> Streaming server thread is started."); + + try (ServerSocket srvSock = new ServerSocket(PORT); + Socket sock = srvSock.accept(); + BufferedWriter writer = + new BufferedWriter(new OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) { + + for (int i = 0; i < ENTRY_COUNT; i++) { + String num = Integer.toString(i); + + writer.write(num + '=' + num); + + writer.newLine(); + } + } + catch (IOException e) { + // No-op. + } + + System.out.println(); + System.out.println(">>> Streaming server thread is finished."); + } + }.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java index e122b66..54dcdce 100644 --- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java @@ -18,6 +18,7 @@ package org.apache.ignite.streaming; import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -25,109 +26,84 @@ import java.io.*; import java.net.*; import java.util.*; -public class IgniteSocketStreamer<K, V> { - /** Target streamer. */ - private final IgniteDataStreamer<K, V> streamer; - - /** Stream to entries iterator transformer. */ - private final IgniteClosure<InputStream, Iterator<Map.Entry<K, V>>> f; - +/** + * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket converts + * to key-value pair using converter. + * + * @param <E> Type of element obtained from socket. + * @param <K> Cache entry key type. + * @param <V> Cache entry value type. + */ +public class IgniteSocketStreamer<E, K, V> { /** Host. */ private final String host; /** Port. */ private final int port; + /** Target streamer. */ + protected final IgniteDataStreamer<K, V> streamer; + + /** Stream to entries iterator transformer. */ + protected final IgniteClosure<E, Map.Entry<K, V>> converter; + /** * Constructs socket streamer. * * @param host Host. * @param port Port. * @param streamer Streamer. - * @param f Stream to entries iterator transformer. + * @param converter Stream to entry converter. */ public IgniteSocketStreamer( - String host, - int port, - IgniteDataStreamer<K, V> streamer, - IgniteClosure<InputStream, Iterator<Map.Entry<K, V>>> f - ) { - + String host, + int port, + IgniteDataStreamer<K, V> streamer, + IgniteClosure<E, Map.Entry<K, V>> converter + ) { A.notNull(streamer, "streamer is null"); A.notNull(host, "host is null"); - A.notNull(f, "f is null"); + A.notNull(converter, "converter is null"); this.host = host; this.port = port; this.streamer = streamer; - this.f = f; + this.converter = converter; } /** * Performs loading of data stream. */ public void loadData() { - try (Socket sock = new Socket(host, port); - InputStream is = sock.getInputStream()) { - - for (Iterator<Map.Entry<K, V>> it = f.apply(is); it.hasNext();) - streamer.addData(it.next()); + try (Socket sock = new Socket(host, port)) { + loadData(sock); } - catch (IOException e) { + catch (Exception e) { throw new IgniteException(e); } } /** - * Base iterator implementation with next element pre-fetching. + * Reads data from socket and loads them into target data stream. * - * @param <E> Element type. + * @param sock Socket. */ - abstract static class NextIterator<E> implements Iterator<E>, AutoCloseable { - /** Closed. */ - private boolean closed; - /** Next value. */ - private E next; - /** Next value is available. */ - private boolean gotNext; - - /** {@inheritDoc} */ - @Override public boolean hasNext() { - if (!closed && !gotNext) { - next = getNext(); - - gotNext = true; + @SuppressWarnings("unchecked") + protected void loadData(Socket sock) throws IOException { + try (ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()))) { + while (true) { + try { + E element = (E) ois.readObject(); + + streamer.addData(converter.apply(element)); + } + catch (EOFException e) { + break; + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteException(e); + } } - - return !closed; - } - - /** {@inheritDoc} */ - @Override public E next() { - if (!hasNext()) - throw new NoSuchElementException("End of stream"); - - gotNext = false; - - return next; - } - - /** {@inheritDoc} */ - @Override public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * Gets next value from backed iterator. If next element isn't available the derived class should - * invoke {@link #close()} method and return any value. - * - * @return Next element or closes the iterator. - */ - protected abstract E getNext(); - - /** {@inheritDoc} */ - @Override public void close() { - closed = true; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java new file mode 100644 index 0000000..6faaeb6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket treats as + * {@link String} instance and converts to key-value pair using converter. + * + * @param <K> Cache entry key type. + * @param <V> Cache entry value type. + */ +public class IgniteTextSocketStreamer<K, V> extends IgniteSocketStreamer<String, K, V> { + /** + * Constructs text socket streamer. + * + * @param host Host. + * @param port Port. + * @param streamer Streamer. + * @param converter Stream to entries converter. + */ + public IgniteTextSocketStreamer( + String host, + int port, + IgniteDataStreamer<K, V> streamer, + IgniteClosure<String, Map.Entry<K, V>> converter + ) { + super(host, port, streamer, converter); + } + + /** {@inheritDoc} */ + @Override protected void loadData(Socket sock) throws IOException { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), "UTF-8"))) { + String val; + + while ((val = reader.readLine()) != null) + streamer.addData(converter.apply(val)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/main/java/org/apache/ignite/streaming/ObjectStreamConverter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/ObjectStreamConverter.java b/modules/core/src/main/java/org/apache/ignite/streaming/ObjectStreamConverter.java deleted file mode 100644 index 32dac48..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streaming/ObjectStreamConverter.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; - -import java.io.*; -import java.util.*; - -/** - * - */ -public class ObjectStreamConverter<K, V> implements IgniteClosure<InputStream, Iterator<Map.Entry<K, V>>> { - /** Key closure. */ - private final IgniteClosure<V, K> keyClos; - - public ObjectStreamConverter(IgniteClosure<V, K> keyClos) { - this.keyClos = keyClos; - } - - /** {@inheritDoc} */ - @Override public Iterator<Map.Entry<K, V>> apply(final InputStream is) { - - final IgniteSocketStreamer.NextIterator<V> it; - - try { - it = new NextObjectIterator<>(is); - } - catch (IOException e) { - throw new IgniteException(e); - } - - return new Iterator<Map.Entry<K, V>>() { - @Override public boolean hasNext() { - return it.hasNext(); - } - - @Override public Map.Entry<K, V> next() { - V next = it.next(); - - return new IgniteBiTuple<>(keyClos.apply(next), next); - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - private static class NextObjectIterator<E> extends IgniteSocketStreamer.NextIterator<E> { - /** Object input stream. */ - private final ObjectInputStream ois; - - public NextObjectIterator(InputStream is) throws IOException { - this.ois = new ObjectInputStream(new BufferedInputStream(is)); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected E getNext() { - try { - E obj = null; - - try { - obj = (E) ois.readObject(); - } catch (EOFException e) { - close(); - } - - return obj; - } catch (IOException | ClassNotFoundException e) { - throw new IgniteException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/main/java/org/apache/ignite/streaming/TextStreamConverter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/TextStreamConverter.java b/modules/core/src/main/java/org/apache/ignite/streaming/TextStreamConverter.java deleted file mode 100644 index ae63edb..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streaming/TextStreamConverter.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; - -import java.io.*; -import java.util.*; - -public class TextStreamConverter implements IgniteClosure<InputStream, Iterator<Map.Entry<String, String>>> { - /** {@inheritDoc} */ - @Override public Iterator<Map.Entry<String, String>> apply(final InputStream is) { - - final IgniteSocketStreamer.NextIterator<String> it; - - try { - it = new NextStringIterator(is); - } - catch (UnsupportedEncodingException e) { - throw new IgniteException(e); - } - - return new Iterator<Map.Entry<String, String>>() { - @Override public boolean hasNext() { - return it.hasNext(); - } - - @Override public Map.Entry<String, String> next() { - String[] pair = it.next().split("="); - - return new IgniteBiTuple<>(pair[0], pair[1]); - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - private static class NextStringIterator extends IgniteSocketStreamer.NextIterator<String> { - /** Buffered reader. */ - private final BufferedReader reader; - - public NextStringIterator(InputStream is) throws UnsupportedEncodingException { - this.reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); - } - - /** {@inheritDoc} */ - @Override protected String getNext() { - try { - String val = reader.readLine(); - - if (val == null) - close(); - - return val; - } - catch (Exception e) { - throw new IgniteException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java new file mode 100644 index 0000000..1bb0088 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Test for data loading using {@link IgniteSocketStreamer}. + */ +public class IgniteSocketStreamerTest extends GridCommonAbstractTest { + /** Host. */ + private static final String HOST = "localhost"; + + /** Port. */ + private static final int PORT = 5555; + + /** Entry count. */ + private static final int ENTRY_CNT = 50000; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * Tests data loading. + */ + public void testLoadData() throws Exception { + try (Ignite g = startGrid()) { + + IgniteCache<Integer, String> cache = g.jcache(null); + + cache.clear(); + + try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) { + + startServer(); + + IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>> converter = + new IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>>() { + @Override public Map.Entry<Integer, String> apply(IgniteBiTuple<Integer, String> input) { + return new IgniteBiTuple<>(input.getKey(), input.getValue()); + } + }; + + IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = + new IgniteSocketStreamer<>(HOST, PORT, stmr, converter); + + sockStmr.loadData(); + } + + assertEquals(ENTRY_CNT, cache.size()); + } finally { + stopAllGrids(); + } + } + + /** + * Starts streaming server and writes data into socket. + */ + private static void startServer() { + new Thread() { + @Override public void run() { + try (ServerSocket srvSock = new ServerSocket(PORT); + Socket sock = srvSock.accept(); + ObjectOutputStream oos = + new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) { + + for (int i = 0; i < ENTRY_CNT; i++) + oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i))); + } + catch (IOException e) { + // No-op. + } + } + }.start(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a30ec69a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java new file mode 100644 index 0000000..34318c8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java @@ -0,0 +1,102 @@ +package org.apache.ignite.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * Test for data loading using {@link IgniteTextSocketStreamer}. + */ +public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { + /** Host. */ + private static final String HOST = "localhost"; + + /** Port. */ + private static final int PORT = 5555; + + /** Entry count. */ + private static final int ENTRY_CNT = 50000; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * Tests data loading. + */ + public void testLoadData() throws Exception { + try (Ignite g = startGrid()) { + + IgniteCache<Integer, String> cache = g.jcache(null); + + cache.clear(); + + try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) { + + startServer(); + + IgniteClosure<String, Map.Entry<Integer, String>> converter = + new IgniteClosure<String, Map.Entry<Integer, String>>() { + @Override public Map.Entry<Integer, String> apply(String input) { + String[] pair = input.split("="); + return new IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]); + } + }; + + IgniteTextSocketStreamer<Integer, String> sockStmr = + new IgniteTextSocketStreamer<>(HOST, PORT, stmr, converter); + + sockStmr.loadData(); + } + + assertEquals(ENTRY_CNT, cache.size()); + } finally { + stopAllGrids(); + } + } + + /** + * Starts streaming server and writes data into socket. + */ + private static void startServer() { + new Thread() { + @Override public void run() { + try (ServerSocket srvSock = new ServerSocket(PORT); + Socket sock = srvSock.accept(); + BufferedWriter writer = + new BufferedWriter(new OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) { + + for (int i = 0; i < ENTRY_CNT; i++) { + String num = Integer.toString(i); + + writer.write(num + '=' + num); + + writer.newLine(); + } + } + catch (IOException e) { + // No-op. + } + } + }.start(); + } +} \ No newline at end of file