Repository: incubator-ignite Updated Branches: refs/heads/ignite-430 2077a88e4 -> 98d9511a4
ignite-430 IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/98d9511a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/98d9511a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/98d9511a Branch: refs/heads/ignite-430 Commit: 98d9511a441c209609b3dcebf29dca24efcfa011 Parents: 2077a88 Author: Andrey Gura <ag...@gridgain.com> Authored: Tue Mar 31 20:49:30 2015 +0300 Committer: Andrey Gura <ag...@gridgain.com> Committed: Tue Mar 31 20:49:30 2015 +0300 ---------------------------------------------------------------------- .../streaming/SocketStreamerExample.java | 2 +- .../streaming/TextSocketStreamerExample.java | 2 +- .../stream/socket/IgniteSocketStreamer.java | 98 +++++++++ .../stream/socket/IgniteTextSocketStreamer.java | 83 ++++++++ .../apache/ignite/stream/socket/Receiver.java | 182 +++++++++++++++++ .../ignite/stream/socket/package-info.java | 21 ++ .../ignite/streaming/IgniteSocketStreamer.java | 98 --------- .../streaming/IgniteTextSocketStreamer.java | 83 -------- .../org/apache/ignite/streaming/Receiver.java | 184 ----------------- .../apache/ignite/streaming/package-info.java | 21 -- .../stream/socket/IgniteSocketStreamerTest.java | 135 ++++++++++++ .../socket/IgniteTextSocketStreamerTest.java | 123 +++++++++++ .../ignite/stream/socket/ReceiverTest.java | 202 ++++++++++++++++++ .../streaming/IgniteSocketStreamerTest.java | 134 ------------ .../streaming/IgniteTextSocketStreamerTest.java | 122 ----------- .../apache/ignite/streaming/ReceiverTest.java | 203 ------------------- pom.xml | 4 +- 17 files changed, 848 insertions(+), 849 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java index 5fea93e..7fdbea8 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java @@ -22,7 +22,7 @@ import org.apache.ignite.examples.*; import org.apache.ignite.examples.streaming.numbers.*; import org.apache.ignite.lang.*; import org.apache.ignite.stream.*; -import org.apache.ignite.streaming.*; +import org.apache.ignite.stream.socket.*; import javax.cache.processor.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java index 4677be1..93a70ae 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java @@ -22,7 +22,7 @@ import org.apache.ignite.examples.*; import org.apache.ignite.examples.streaming.numbers.*; import org.apache.ignite.lang.*; import org.apache.ignite.stream.*; -import org.apache.ignite.streaming.*; +import org.apache.ignite.stream.socket.*; import javax.cache.processor.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java new file mode 100644 index 0000000..3540393 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket converts + * to key-value pair using converter. + * + * @param <E> Type of element obtained from socket. + * @param <K> Cache entry key type. + * @param <V> Cache entry value type. + */ +public class IgniteSocketStreamer<E, K, V> extends Receiver<E, K, V> { + /** Host. */ + private final String host; + + /** Port. */ + private final int port; + + /** + * Constructs socket streamer. + * + * @param host Host. + * @param port Port. + * @param streamer Streamer. + * @param converter Stream to entry converter. + */ + public IgniteSocketStreamer( + String host, + int port, + IgniteDataStreamer<K, V> streamer, + IgniteClosure<E, Map.Entry<K, V>> converter + ) { + super(streamer, converter); + + A.notNull(host, "host is null"); + + this.host = host; + this.port = port; + } + + /** {@inheritDoc} */ + @Override protected void receive() { + try (Socket sock = new Socket(host, port)) { + receive(sock); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** + * Reads data from socket and adds them into target data stream. + * + * @param sock Socket. + */ + @SuppressWarnings("unchecked") + private void receive(Socket sock) throws IOException { + try (ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()))) { + while (!isStopped()) { + try { + E element = (E) ois.readObject(); + + addData(element); + } + catch (EOFException e) { + break; + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteException(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamer.java new file mode 100644 index 0000000..0852317 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket treats as + * {@link String} instance and converts to key-value pair using converter. + * + * @param <K> Cache entry key type. + * @param <V> Cache entry value type. + */ +public class IgniteTextSocketStreamer<K, V> extends Receiver<String, K, V> { + /** Host. */ + private final String host; + + /** Port. */ + private final int port; + + /** + * Constructs text socket streamer. + * + * @param host Host. + * @param port Port. + * @param streamer Streamer. + * @param converter Stream to entries converter. + */ + public IgniteTextSocketStreamer( + String host, + int port, + IgniteDataStreamer<K, V> streamer, + IgniteClosure<String, Map.Entry<K, V>> converter + ) { + super(streamer, converter); + + A.notNull(host, "host is null"); + + this.host = host; + this.port = port; + } + + /** {@inheritDoc} */ + @Override protected void receive() { + try (Socket sock = new Socket(host, port)) { + loadData(sock); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + private void loadData(Socket sock) throws IOException { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), "UTF-8"))) { + String val; + + while (!isStopped() && (val = reader.readLine()) != null) + addData(val); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/stream/socket/Receiver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/Receiver.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/Receiver.java new file mode 100644 index 0000000..72c24ad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/Receiver.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Base implementation of data receiver. + * + * @param <E> Type of stream element. + * @param <K> Type of cache entry key. + * @param <V> Type of cache entry value. + */ +public abstract class Receiver<E, K, V> { + /** Object monitor. */ + private final Object lock = new Object(); + + /** Worker. */ + private Thread worker; + + /** State. */ + private volatile State state = State.INITIALIZED; + + /** Target streamer. */ + private final IgniteDataStreamer<K, V> streamer; + + /** Element to entries transformer. */ + private final IgniteClosure<E, Map.Entry<K, V>> converter; + + /** Restart interval in milliseconds. */ + private volatile long restartInterval = 2000; + + /** + * Constructs stream receiver. + * + * @param streamer Streamer. + * @param converter Element to entries transformer. + */ + public Receiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, Map.Entry<K, V>> converter) { + A.notNull(streamer, "streamer is null"); + A.notNull(converter, "converter is null"); + + this.streamer = streamer; + this.converter = converter; + } + + /** + * Sets restart interval in milliseconds. + * + * @param interval Interval in milliseconds. + */ + public void restartInterval(long interval) { + A.ensure(interval > 0, "interval > 0"); + + this.restartInterval = interval; + } + + /** + * Starts receiver. + */ + public void start() { + synchronized (lock) { + if (state != State.INITIALIZED) + throw new IllegalStateException("Receiver in " + state + " state can't be started."); + + worker = new Thread(new ReceiverWorker()); + + worker.start(); + + state = State.STARTED; + } + } + + /** + * Stops receiver. + */ + public void stop() { + synchronized (lock) { + if (state != State.STARTED) + throw new IllegalStateException("Receiver in " + state + " state can't be stopped."); + + state = State.STOPPED; + } + + try { + worker.join(); + } + catch (InterruptedException e) { + // No-op. + } + } + + /** + * Checks whether receiver is started or not. + * + * @return {@code True} if receiver is started, {@code false} - otherwise. + */ + public boolean isStarted() { + return state == State.STARTED; + } + + /** + * Checks whether receiver is stopped or not. + * + * @return {@code True} if receiver is stopped, {@code false} - otherwise. + */ + public boolean isStopped() { + return state == State.STOPPED; + } + + /** + * Performs actual data receiving. + */ + protected abstract void receive(); + + /** + * Convert stream data to cache entry and transfer it to the target streamer. + * + * @param element Element. + */ + protected void addData(E element) { + streamer.addData(converter.apply(element)); + } + + /** + * Receiver state. + */ + public enum State { + /** New. */ + INITIALIZED, + /** Started. */ + STARTED, + /** Stopped. */ + STOPPED + } + + /** + * Receiver worker that actually receives data from socket. + */ + private class ReceiverWorker implements Runnable { + /** {@inheritDoc} */ + @Override public void run() { + while (true) { + try { + receive(); + } + catch (Exception e) { + // No-op. + } + + if (isStopped()) + return; + + try { + Thread.sleep(restartInterval); + } + catch (InterruptedException e) { + // No-op. + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java new file mode 100644 index 0000000..2644b33 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains Ignite socket streaming classes. + */ +package org.apache.ignite.stream.socket; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java deleted file mode 100644 index 72b6082..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket converts - * to key-value pair using converter. - * - * @param <E> Type of element obtained from socket. - * @param <K> Cache entry key type. - * @param <V> Cache entry value type. - */ -public class IgniteSocketStreamer<E, K, V> extends Receiver<E, K, V> { - /** Host. */ - private final String host; - - /** Port. */ - private final int port; - - /** - * Constructs socket streamer. - * - * @param host Host. - * @param port Port. - * @param streamer Streamer. - * @param converter Stream to entry converter. - */ - public IgniteSocketStreamer( - String host, - int port, - IgniteDataStreamer<K, V> streamer, - IgniteClosure<E, Map.Entry<K, V>> converter - ) { - super(streamer, converter); - - A.notNull(host, "host is null"); - - this.host = host; - this.port = port; - } - - /** {@inheritDoc} */ - @Override protected void receive() { - try (Socket sock = new Socket(host, port)) { - receive(sock); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - /** - * Reads data from socket and adds them into target data stream. - * - * @param sock Socket. - */ - @SuppressWarnings("unchecked") - private void receive(Socket sock) throws IOException { - try (ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()))) { - while (!isStopped()) { - try { - E element = (E) ois.readObject(); - - addData(element); - } - catch (EOFException e) { - break; - } - catch (IOException | ClassNotFoundException e) { - throw new IgniteException(e); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java deleted file mode 100644 index 8094d37..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket treats as - * {@link String} instance and converts to key-value pair using converter. - * - * @param <K> Cache entry key type. - * @param <V> Cache entry value type. - */ -public class IgniteTextSocketStreamer<K, V> extends Receiver<String, K, V> { - /** Host. */ - private final String host; - - /** Port. */ - private final int port; - - /** - * Constructs text socket streamer. - * - * @param host Host. - * @param port Port. - * @param streamer Streamer. - * @param converter Stream to entries converter. - */ - public IgniteTextSocketStreamer( - String host, - int port, - IgniteDataStreamer<K, V> streamer, - IgniteClosure<String, Map.Entry<K, V>> converter - ) { - super(streamer, converter); - - A.notNull(host, "host is null"); - - this.host = host; - this.port = port; - } - - /** {@inheritDoc} */ - @Override protected void receive() { - try (Socket sock = new Socket(host, port)) { - loadData(sock); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - private void loadData(Socket sock) throws IOException { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), "UTF-8"))) { - String val; - - while (!isStopped() && (val = reader.readLine()) != null) - addData(val); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java b/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java deleted file mode 100644 index 71a59bf..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Base implementation of data receiver. - * - * @param <E> Type of stream element. - * @param <K> Type of cache entry key. - * @param <V> Type of cache entry value. - */ -public abstract class Receiver<E, K, V> { - /** Object monitor. */ - private final Object lock = new Object(); - - /** Stop latch. */ - private final CountDownLatch stopLatch = new CountDownLatch(1); - - /** State. */ - private volatile State state = State.INITIALIZED; - - /** Target streamer. */ - private final IgniteDataStreamer<K, V> streamer; - - /** Element to entries transformer. */ - private final IgniteClosure<E, Map.Entry<K, V>> converter; - - /** Restart interval in milliseconds. */ - private volatile long restartInterval = 2000; - - /** - * Constructs stream receiver. - * - * @param streamer Streamer. - * @param converter Element to entries transformer. - */ - public Receiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, Map.Entry<K, V>> converter) { - A.notNull(streamer, "streamer is null"); - A.notNull(converter, "converter is null"); - - this.streamer = streamer; - this.converter = converter; - } - - /** - * Sets restart interval in milliseconds. - * - * @param interval Interval in milliseconds. - */ - public void restartInterval(long interval) { - A.ensure(interval > 0, "interval > 0"); - - this.restartInterval = interval; - } - - /** - * Starts receiver. - */ - public void start() { - synchronized (lock) { - if (state != State.INITIALIZED) - throw new IllegalStateException("Receiver in " + state + " state can't be started."); - - new Thread(new ReceiverWorker()).start(); - - state = State.STARTED; - } - } - - /** - * Stops receiver. - */ - public void stop() { - synchronized (lock) { - if (state != State.STARTED) - throw new IllegalStateException("Receiver in " + state + " state can't be stopped."); - - state = State.STOPPED; - - try { - stopLatch.await(); - } - catch (InterruptedException e) { - // No-op. - } - } - } - - /** - * Checks whether receiver is started or not. - * - * @return {@code True} if receiver is started, {@code false} - otherwise. - */ - public boolean isStarted() { - return state == State.STARTED; - } - - /** - * Checks whether receiver is stopped or not. - * - * @return {@code True} if receiver is stopped, {@code false} - otherwise. - */ - public boolean isStopped() { - return state == State.STOPPED; - } - - /** - * Performs actual data receiving. - */ - protected abstract void receive(); - - /** - * Convert stream data to cache entry and transfer it to the target streamer. - * - * @param element Element. - */ - protected void addData(E element) { - streamer.addData(converter.apply(element)); - } - - /** - * Receiver state. - */ - public enum State { - /** New. */ - INITIALIZED, - /** Started. */ - STARTED, - /** Stopped. */ - STOPPED - } - - /** - * Receiver worker that actually receives data from socket. - */ - private class ReceiverWorker implements Runnable { - /** {@inheritDoc} */ - @Override public void run() { - while (true) { - try { - receive(); - } - catch (Throwable e) { - // No-op. - } - - if (isStopped()) { - stopLatch.countDown(); - - break; - } - - try { - Thread.sleep(restartInterval); - } - catch (InterruptedException e) { - // No-op. - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java b/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java deleted file mode 100644 index 79bb27e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains Ignite streaming classes. - */ -package org.apache.ignite.streaming; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerTest.java new file mode 100644 index 0000000..bff973a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.socket.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Test for data loading using {@link IgniteSocketStreamer}. + */ +public class IgniteSocketStreamerTest extends GridCommonAbstractTest { + /** Host. */ + private static final String HOST = "localhost"; + + /** Port. */ + private static final int PORT = 5555; + + /** Entry count. */ + private static final int ENTRY_CNT = 5000; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * Tests data loading. + */ + public void testStreamer() throws Exception { + try (Ignite g = startGrid()) { + + IgniteCache<Integer, String> cache = g.cache(null); + + cache.clear(); + + try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) { + + startServer(); + + IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>> converter = + new IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>>() { + @Override public Map.Entry<Integer, String> apply(IgniteBiTuple<Integer, String> input) { + return new IgniteBiTuple<>(input.getKey(), input.getValue()); + } + }; + + final AtomicInteger cnt = new AtomicInteger(); + + IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = + new IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String>( + HOST, PORT, stmr, converter + ) { + @Override protected void addData(IgniteBiTuple<Integer, String> element) { + super.addData(element); + + cnt.incrementAndGet(); + } + }; + + sockStmr.start(); + + // Wait for all data streamed. + while (cnt.get() < ENTRY_CNT) + Thread.sleep(100); + + sockStmr.stop(); + + assertFalse(sockStmr.isStarted()); + assertTrue(sockStmr.isStopped()); + } + + assertEquals(ENTRY_CNT, cache.size()); + } + finally { + stopAllGrids(); + } + } + + /** + * Starts streaming server and writes data into socket. + */ + private static void startServer() { + new Thread() { + @Override public void run() { + try (ServerSocket srvSock = new ServerSocket(PORT); + Socket sock = srvSock.accept(); + ObjectOutputStream oos = + new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) { + + for (int i = 0; i < ENTRY_CNT; i++) + oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i))); + } + catch (IOException e) { + // No-op. + } + } + }.start(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamerTest.java new file mode 100644 index 0000000..9a87715 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamerTest.java @@ -0,0 +1,123 @@ +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.socket.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Test for data loading using {@link IgniteTextSocketStreamer}. + */ +public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { + /** Host. */ + private static final String HOST = "localhost"; + + /** Port. */ + private static final int PORT = 5555; + + /** Entry count. */ + private static final int ENTRY_CNT = 5000; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * Tests data loading. + */ + public void testStream() throws Exception { + try (Ignite g = startGrid()) { + + IgniteCache<Integer, String> cache = g.cache(null); + + cache.clear(); + + try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) { + + startServer(); + + IgniteClosure<String, Map.Entry<Integer, String>> converter = + new IgniteClosure<String, Map.Entry<Integer, String>>() { + @Override public Map.Entry<Integer, String> apply(String input) { + String[] pair = input.split("=", 2); + + return new IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]); + } + }; + + final AtomicInteger cnt = new AtomicInteger(); + + IgniteTextSocketStreamer<Integer, String> sockStmr = + new IgniteTextSocketStreamer<Integer, String>(HOST, PORT, stmr, converter) { + @Override protected void addData(String element) { + super.addData(element); + + cnt.incrementAndGet(); + } + }; + + sockStmr.start(); + + // Wait for all data streamed. + while (cnt.get() < ENTRY_CNT) + Thread.sleep(100); + + sockStmr.stop(); + + assertFalse(sockStmr.isStarted()); + assertTrue(sockStmr.isStopped()); + } + + assertEquals(ENTRY_CNT, cache.size()); + } + finally { + stopAllGrids(); + } + } + + /** + * Starts streaming server and writes data into socket. + */ + private static void startServer() { + new Thread() { + @Override public void run() { + try (ServerSocket srvSock = new ServerSocket(PORT); + Socket sock = srvSock.accept(); + BufferedWriter writer = + new BufferedWriter(new OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) { + + for (int i = 0; i < ENTRY_CNT; i++) { + String num = Integer.toString(i); + + writer.write(num + '=' + num); + + writer.newLine(); + } + } + catch (IOException e) { + // No-op. + } + } + }.start(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/stream/socket/ReceiverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/ReceiverTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/ReceiverTest.java new file mode 100644 index 0000000..a6754ba --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/ReceiverTest.java @@ -0,0 +1,202 @@ +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import junit.framework.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Tests for {@link Receiver}. + */ +public class ReceiverTest extends TestCase { + /** Converter. */ + private static final IgniteClosure<Integer, Map.Entry<Integer, String>> CONVERTER = + new IgniteClosure<Integer, Map.Entry<Integer, String>>() { + @Override public Map.Entry<Integer, String> apply(Integer input) { + return new IgniteBiTuple<>(input, input.toString()); + } + }; + + /** Stmr. */ + private static final IgniteDataStreamer<Integer, String> STMR = new DataStreamerStub<>(); + + /** Receiver. */ + private final Receiver<Integer, Integer, String> receiver = + new Receiver<Integer, Integer, String>(STMR, CONVERTER) { + @Override protected void receive() { + while (!isStopped()) { + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + // No-op. + } + } + } + }; + + /** + * Tests receiver behavior in case of forced termination. + * + * @throws Exception If error occurred. + */ + public void testReceiver() throws Exception { + assertFalse(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + receiver.start(); + + assertTrue(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + // Wait for some period before stop. + Thread.sleep(500); + + receiver.stop(); + + assertFalse(receiver.isStarted()); + assertTrue(receiver.isStopped()); + + try { + receiver.start(); + fail("IllegalStateException expected."); + } + catch (IllegalStateException e) { + // No-op + } + + try { + receiver.stop(); + fail("IllegalStateException expected."); + } + catch (IllegalStateException e) { + // No-op + } + } + + /** + * Receiver stub. + * + * @param <K> Key type. + * @param <V> Value type. + */ + private static class DataStreamerStub<K, V> implements IgniteDataStreamer<K, V> { + /** {@inheritDoc} */ + @Override public String cacheName() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean allowOverwrite() { + return false; + } + + /** {@inheritDoc} */ + @Override public void allowOverwrite(boolean allowOverwrite) throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return false; + } + + /** {@inheritDoc} */ + @Override public void skipStore(boolean skipStore) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int perNodeBufferSize() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void perNodeBufferSize(int bufSize) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int perNodeParallelOperations() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void perNodeParallelOperations(int parallelOps) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long autoFlushFrequency() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void autoFlushFrequency(long autoFlushFreq) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> future() { + return null; + } + + /** {@inheritDoc} */ + @Override public void deployClass(Class<?> depCls) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> removeData(K key) throws IgniteException, IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) + throws IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public void flush() throws IgniteException, IllegalStateException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void tryFlush() throws IgniteException, IllegalStateException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void close(boolean cancel) throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + // No-op. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java deleted file mode 100644 index d7357c7..0000000 --- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.CacheMode.*; - -/** - * Test for data loading using {@link IgniteSocketStreamer}. - */ -public class IgniteSocketStreamerTest extends GridCommonAbstractTest { - /** Host. */ - private static final String HOST = "localhost"; - - /** Port. */ - private static final int PORT = 5555; - - /** Entry count. */ - private static final int ENTRY_CNT = 5000; - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration ccfg = new CacheConfiguration(); - - ccfg.setCacheMode(PARTITIONED); - - ccfg.setBackups(1); - - cfg.setCacheConfiguration(ccfg); - - return cfg; - } - - /** - * Tests data loading. - */ - public void testStreamer() throws Exception { - try (Ignite g = startGrid()) { - - IgniteCache<Integer, String> cache = g.cache(null); - - cache.clear(); - - try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) { - - startServer(); - - IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>> converter = - new IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>>() { - @Override public Map.Entry<Integer, String> apply(IgniteBiTuple<Integer, String> input) { - return new IgniteBiTuple<>(input.getKey(), input.getValue()); - } - }; - - final AtomicInteger cnt = new AtomicInteger(); - - IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = - new IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String>( - HOST, PORT, stmr, converter - ) { - @Override protected void addData(IgniteBiTuple<Integer, String> element) { - super.addData(element); - - cnt.incrementAndGet(); - } - }; - - sockStmr.start(); - - // Wait for all data streamed. - while (cnt.get() < ENTRY_CNT) - Thread.sleep(100); - - sockStmr.stop(); - - assertFalse(sockStmr.isStarted()); - assertTrue(sockStmr.isStopped()); - } - - assertEquals(ENTRY_CNT, cache.size()); - } - finally { - stopAllGrids(); - } - } - - /** - * Starts streaming server and writes data into socket. - */ - private static void startServer() { - new Thread() { - @Override public void run() { - try (ServerSocket srvSock = new ServerSocket(PORT); - Socket sock = srvSock.accept(); - ObjectOutputStream oos = - new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) { - - for (int i = 0; i < ENTRY_CNT; i++) - oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i))); - } - catch (IOException e) { - // No-op. - } - } - }.start(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java deleted file mode 100644 index 436bc8f..0000000 --- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java +++ /dev/null @@ -1,122 +0,0 @@ -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.CacheMode.*; - -/** - * Test for data loading using {@link IgniteTextSocketStreamer}. - */ -public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { - /** Host. */ - private static final String HOST = "localhost"; - - /** Port. */ - private static final int PORT = 5555; - - /** Entry count. */ - private static final int ENTRY_CNT = 5000; - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration ccfg = new CacheConfiguration(); - - ccfg.setCacheMode(PARTITIONED); - - ccfg.setBackups(1); - - cfg.setCacheConfiguration(ccfg); - - return cfg; - } - - /** - * Tests data loading. - */ - public void testStream() throws Exception { - try (Ignite g = startGrid()) { - - IgniteCache<Integer, String> cache = g.cache(null); - - cache.clear(); - - try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) { - - startServer(); - - IgniteClosure<String, Map.Entry<Integer, String>> converter = - new IgniteClosure<String, Map.Entry<Integer, String>>() { - @Override public Map.Entry<Integer, String> apply(String input) { - String[] pair = input.split("=", 2); - - return new IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]); - } - }; - - final AtomicInteger cnt = new AtomicInteger(); - - IgniteTextSocketStreamer<Integer, String> sockStmr = - new IgniteTextSocketStreamer<Integer, String>(HOST, PORT, stmr, converter) { - @Override protected void addData(String element) { - super.addData(element); - - cnt.incrementAndGet(); - } - }; - - sockStmr.start(); - - // Wait for all data streamed. - while (cnt.get() < ENTRY_CNT) - Thread.sleep(100); - - sockStmr.stop(); - - assertFalse(sockStmr.isStarted()); - assertTrue(sockStmr.isStopped()); - } - - assertEquals(ENTRY_CNT, cache.size()); - } - finally { - stopAllGrids(); - } - } - - /** - * Starts streaming server and writes data into socket. - */ - private static void startServer() { - new Thread() { - @Override public void run() { - try (ServerSocket srvSock = new ServerSocket(PORT); - Socket sock = srvSock.accept(); - BufferedWriter writer = - new BufferedWriter(new OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) { - - for (int i = 0; i < ENTRY_CNT; i++) { - String num = Integer.toString(i); - - writer.write(num + '=' + num); - - writer.newLine(); - } - } - catch (IOException e) { - // No-op. - } - } - }.start(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java deleted file mode 100644 index 9f4e056..0000000 --- a/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java +++ /dev/null @@ -1,203 +0,0 @@ -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; - -import junit.framework.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Tests for {@link Receiver}. - */ -public class ReceiverTest extends TestCase { - /** Converter. */ - private static final IgniteClosure<Integer, Map.Entry<Integer, String>> CONVERTER = - new IgniteClosure<Integer, Map.Entry<Integer, String>>() { - @Override public Map.Entry<Integer, String> apply(Integer input) { - return new IgniteBiTuple<>(input, input.toString()); - } - }; - - /** Stmr. */ - private static final IgniteDataStreamer<Integer, String> STMR = new DataStreamerStub<>(); - - /** Receiver. */ - private final Receiver<Integer, Integer, String> receiver = - new Receiver<Integer, Integer, String>(STMR, CONVERTER) { - @Override protected void receive() { - while (!isStopped()) { - try { - Thread.sleep(50); - } - catch (InterruptedException e) { - // No-op. - } - } - } - }; - - /** - * Tests receiver behavior in case of forced termination. - * - * @throws Exception If error occurred. - */ - public void testReceiver() throws Exception { - assertFalse(receiver.isStarted()); - assertFalse(receiver.isStopped()); - - receiver.start(); - - assertTrue(receiver.isStarted()); - assertFalse(receiver.isStopped()); - - // Wait for some period before stop. - Thread.sleep(500); - - receiver.stop(); - - assertFalse(receiver.isStarted()); - assertTrue(receiver.isStopped()); - - try { - receiver.start(); - fail("IllegalStateException expected."); - } - catch (IllegalStateException e) { - // No-op - } - - try { - receiver.stop(); - fail("IllegalStateException expected."); - } - catch (IllegalStateException e) { - // No-op - } - } - - /** - * Receiver stub. - * - * @param <K> Key type. - * @param <V> Value type. - */ - private static class DataStreamerStub<K, V> implements IgniteDataStreamer<K, V> { - /** {@inheritDoc} */ - @Override public String cacheName() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean allowOverwrite() { - return false; - } - - /** {@inheritDoc} */ - @Override public void allowOverwrite(boolean allowOverwrite) throws IgniteException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean skipStore() { - return false; - } - - /** {@inheritDoc} */ - @Override public void skipStore(boolean skipStore) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int perNodeBufferSize() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void perNodeBufferSize(int bufSize) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int perNodeParallelOperations() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void perNodeParallelOperations(int parallelOps) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long autoFlushFrequency() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void autoFlushFrequency(long autoFlushFreq) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> future() { - return null; - } - - /** {@inheritDoc} */ - @Override public void deployClass(Class<?> depCls) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeData(K key) throws IgniteException, IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) - throws IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public void flush() throws IgniteException, IllegalStateException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void tryFlush() throws IgniteException, IllegalStateException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void close(boolean cancel) throws IgniteException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteException { - // No-op. - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7faf021..1a550fb 100644 --- a/pom.xml +++ b/pom.xml @@ -787,7 +787,7 @@ </group> <group> <title>Streaming APIs</title> - <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.streaming</packages> + <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.stream.socket</packages> </group> <group> <title>Security APIs</title> @@ -982,7 +982,7 @@ </group> <group> <title>Streaming APIs</title> - <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.streaming</packages> + <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.stream.socket</packages> </group> <group> <title>Security APIs</title>