Repository: incubator-ignite Updated Branches: refs/heads/ignite-430 9afb1973e -> cf9ddc5c5
ignite-430 IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cf9ddc5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cf9ddc5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cf9ddc5c Branch: refs/heads/ignite-430 Commit: cf9ddc5c5f9c682003c14ad95f96f4f82d1a42e9 Parents: 9afb197 Author: Andrey Gura <ag...@gridgain.com> Authored: Thu Mar 26 20:38:04 2015 +0300 Committer: Andrey Gura <ag...@gridgain.com> Committed: Thu Mar 26 20:38:04 2015 +0300 ---------------------------------------------------------------------- examples/config/example-cache.xml | 4 +- .../streaming/SocketStreamerExample.java | 23 +-- .../streaming/TextSocketStreamerExample.java | 15 +- .../ignite/streaming/IgniteSocketStreamer.java | 76 +------ .../streaming/IgniteTextSocketStreamer.java | 32 ++- .../apache/ignite/streaming/StreamReceiver.java | 188 +++++++++++++++++ .../streaming/IgniteSocketStreamerTest.java | 9 +- .../streaming/IgniteTextSocketStreamerTest.java | 11 +- .../ignite/streaming/StreamReceiverTest.java | 201 +++++++++++++++++++ 9 files changed, 458 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/examples/config/example-cache.xml ---------------------------------------------------------------------- diff --git a/examples/config/example-cache.xml b/examples/config/example-cache.xml index 706f959..0bce70e 100644 --- a/examples/config/example-cache.xml +++ b/examples/config/example-cache.xml @@ -152,8 +152,8 @@ to our documentation: http://doc.gridgain.org/latest/Automatic+Node+Discovery --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> - <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> - <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java index 04b7101..8ed4451 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java @@ -18,7 +18,6 @@ package org.apache.ignite.examples.streaming; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.examples.*; import org.apache.ignite.examples.datagrid.*; import org.apache.ignite.lang.*; @@ -91,24 +90,9 @@ public class SocketStreamerExample { IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = new IgniteSocketStreamer<>(HOST, PORT, stmr, converter); - //sockStmr.loadData(); - - System.out.println(">>> Start streaming async"); - IgniteFuture<Void> fut = sockStmr.start(); - System.out.println(">>> Future obtained"); - - try { - fut.get(3000); - } - catch (IgniteFutureTimeoutException e) { - // No-op. - } - - sockStmr.stop(); - - System.out.println(">>> Future completed for " + fut.duration()); + fut.get(); } long end = System.currentTimeMillis(); @@ -131,11 +115,8 @@ public class SocketStreamerExample { ObjectOutputStream oos = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) { - for (int i = 0; i < ENTRY_COUNT; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i))); - if (i > 0 && i % 1000 == 0) - System.out.println(">>> " + i + " events sent."); - } } catch (IOException e) { // No-op. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java index d6c6daa..49de6cb 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java @@ -92,7 +92,20 @@ public class TextSocketStreamerExample { IgniteTextSocketStreamer<Integer, String> sockStmr = new IgniteTextSocketStreamer<>(HOST, PORT, stmr, converter); - sockStmr.loadData(); + IgniteFuture<Void> fut = sockStmr.start(); + + try { + fut.get(500); + } catch (IgniteFutureTimeoutException e) { + // No-op. + } + + //fut.get(); + + sockStmr.stop(); + + System.out.println(">>> Future done: " + fut.isDone()); + System.out.println(">>> Future canceled: " + fut.isCancelled()); } long end = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java index cce4570..e599cdd 100644 --- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java @@ -18,13 +18,8 @@ package org.apache.ignite.streaming; import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; -import org.apache.ignite.logger.*; -import org.apache.ignite.thread.*; import java.io.*; import java.net.*; @@ -38,24 +33,13 @@ import java.util.*; * @param <K> Cache entry key type. * @param <V> Cache entry value type. */ -public class IgniteSocketStreamer<E, K, V> { - /** Logger. */ - private static final IgniteLogger log = new NullLogger(); - +public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E,K,V> { /** Host. */ private final String host; /** Port. */ private final int port; - private volatile GridWorker worker; - - /** Target streamer. */ - protected final IgniteDataStreamer<K, V> streamer; - - /** Stream to entries iterator transformer. */ - protected final IgniteClosure<E, Map.Entry<K, V>> converter; - /** * Constructs socket streamer. * @@ -70,20 +54,16 @@ public class IgniteSocketStreamer<E, K, V> { IgniteDataStreamer<K, V> streamer, IgniteClosure<E, Map.Entry<K, V>> converter ) { - A.notNull(streamer, "streamer is null"); + super(streamer, converter); + A.notNull(host, "host is null"); - A.notNull(converter, "converter is null"); this.host = host; this.port = port; - this.streamer = streamer; - this.converter = converter; } - /** - * Performs loading of data stream. - */ - public void loadData() { + /** {@inheritDoc} */ + @Override protected void loadData() { try (Socket sock = new Socket(host, port)) { loadData(sock); } @@ -92,41 +72,19 @@ public class IgniteSocketStreamer<E, K, V> { } } - public IgniteFuture<Void> start() { - - final GridFutureAdapter<Void> fut = new GridFutureAdapter<>(); - - ReceiverWorker worker = new ReceiverWorker( - "GRID???", "Socket streamer receiver", log, new GridWorkerListenerAdapter() { - @Override public void onStopped(GridWorker w) { - fut.onDone(); - } - }); - - this.worker = worker; - - new IgniteThread(worker).start(); - - return new IgniteFutureImpl<>(fut); - } - - public void stop() { - worker.cancel(); - } - /** * Reads data from socket and loads them into target data stream. * * @param sock Socket. */ @SuppressWarnings("unchecked") - protected void loadData(Socket sock) throws IOException { + private void loadData(Socket sock) throws IOException { try (ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()))) { - while (true) { + while (!isStopped()) { try { E element = (E) ois.readObject(); - streamer.addData(converter.apply(element)); + addData(element); } catch (EOFException e) { break; @@ -137,22 +95,4 @@ public class IgniteSocketStreamer<E, K, V> { } } } - - private class ReceiverWorker extends GridWorker { - - private final GridWorkerListener lsnr; - - protected ReceiverWorker(String gridName, String name, IgniteLogger log, GridWorkerListener lsnr) { - super(gridName, name, log, lsnr); - this.lsnr = lsnr; - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - lsnr.onStarted(this); - loadData(); - lsnr.onStopped(this); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java index 6faaeb6..a43d26b 100644 --- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java @@ -18,6 +18,7 @@ package org.apache.ignite.streaming; import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import java.io.*; @@ -31,7 +32,13 @@ import java.util.*; * @param <K> Cache entry key type. * @param <V> Cache entry value type. */ -public class IgniteTextSocketStreamer<K, V> extends IgniteSocketStreamer<String, K, V> { +public class IgniteTextSocketStreamer<K, V> extends StreamReceiver<String, K, V> { + /** Host. */ + private final String host; + + /** Port. */ + private final int port; + /** * Constructs text socket streamer. * @@ -46,16 +53,31 @@ public class IgniteTextSocketStreamer<K, V> extends IgniteSocketStreamer<String, IgniteDataStreamer<K, V> streamer, IgniteClosure<String, Map.Entry<K, V>> converter ) { - super(host, port, streamer, converter); + super(streamer, converter); + + A.notNull(host, "host is null"); + + this.host = host; + this.port = port; + } + + /** {@inheritDoc} */ + @Override protected void loadData() { + try (Socket sock = new Socket(host, port)) { + loadData(sock); + } + catch (Exception e) { + throw new IgniteException(e); + } } /** {@inheritDoc} */ - @Override protected void loadData(Socket sock) throws IOException { + private void loadData(Socket sock) throws IOException { try (BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), "UTF-8"))) { String val; - while ((val = reader.readLine()) != null) - streamer.addData(converter.apply(val)); + while (!isStopped() && (val = reader.readLine()) != null) + addData(val); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java new file mode 100644 index 0000000..07d6775 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Base implementation of stream receiver. + * + * @param <E> + * @param <K> + * @param <V> + */ +public abstract class StreamReceiver<E, K, V> { + /** Object monitor. */ + private final Object lock = new Object(); + + /** Stop latch. */ + private final CountDownLatch stopLatch = new CountDownLatch(1); + + /** State. */ + private volatile State state = State.INITIALIZED; + + /** Target streamer. */ + private final IgniteDataStreamer<K, V> streamer; + + /** Element to entries transformer. */ + private final IgniteClosure<E, Map.Entry<K, V>> converter; + + /** + * Constructs stream receiver. + * + * @param streamer Streamer. + * @param converter Element to entries transformer. + */ + public StreamReceiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, Map.Entry<K, V>> converter) { + A.notNull(streamer, "streamer is null"); + A.notNull(converter, "converter is null"); + + this.streamer = streamer; + this.converter = converter; + } + + /** + * Starts streamer. + */ + public IgniteFuture<Void> start() { + synchronized (lock) { + if (state != State.INITIALIZED) + throw new IllegalStateException("Receiver in " + state + " state can't be started."); + + GridFutureAdapter<Void> fut = new GridFutureAdapter<>(); + + new Thread(new Receiver(fut)).start(); + + state = State.STARTED; + + return new IgniteFutureImpl<>(fut); + } + } + + /** + * Stops streamer. + */ + public void stop() { + synchronized (lock) { + if (state != State.STARTED) + throw new IllegalStateException("Receiver in " + state + " state can't be stopped."); + + state = State.STOPPED; + + try { + stopLatch.await(); + } catch (InterruptedException e) { + // No-op. + } + } + } + + /** + * Returns stream receiver state. + * + * @return stream receiver state. + */ + public State state() { + return state; + } + + /** + * Checks whether receiver is started or not. + * + * @return {@code True} if receiver is started, {@code false} - otherwise. + */ + public boolean isStarted() { + return state == State.STARTED; + } + + /** + * Checks whether receiver is stopped or not. + * + * @return {@code True} if receiver is stopped, {@code false} - otherwise. + */ + public boolean isStopped() { + return state == State.STOPPED; + } + + /** + * Performs actual loading of data. Override this method in order to implement own data loading functionality. + */ + protected abstract void loadData(); + + /** + * Convert stream data to cache entry and transfer it to the target streamer. + * + * @param element Element. + */ + protected void addData(E element) { + streamer.addData(converter.apply(element)); + } + + /** + * Receiver state. + */ + public enum State { + /** New. */ + INITIALIZED, + /** Started. */ + STARTED, + /** Stopped. */ + STOPPED + } + + /** + * Receiver worker that actually receives data from socket. + */ + private class Receiver implements Runnable { + /** Future. */ + private final GridFutureAdapter<Void> fut; + + /** + * @param fut Future. + */ + public Receiver(GridFutureAdapter<Void> fut) { + this.fut = fut; + } + + /** {@inheritDoc} */ + @Override public void run() { + Throwable err = null; + + try { + loadData(); + } + catch (Throwable e) { + err = e; + } + finally { + if (state == State.STOPPED) + fut.onCancelled(); + else + fut.onDone(null, err); + + stopLatch.countDown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java index 1bb0088..a88c214 100644 --- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java @@ -60,7 +60,7 @@ public class IgniteSocketStreamerTest extends GridCommonAbstractTest { /** * Tests data loading. */ - public void testLoadData() throws Exception { + public void testStreamer() throws Exception { try (Ignite g = startGrid()) { IgniteCache<Integer, String> cache = g.jcache(null); @@ -81,7 +81,12 @@ public class IgniteSocketStreamerTest extends GridCommonAbstractTest { IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = new IgniteSocketStreamer<>(HOST, PORT, stmr, converter); - sockStmr.loadData(); + IgniteFuture<Void> fut = sockStmr.start(); + + fut.get(); + + assertTrue(fut.isDone()); + assertFalse(fut.isCancelled()); } assertEquals(ENTRY_CNT, cache.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java index 34318c8..7ba2b13 100644 --- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java @@ -43,7 +43,7 @@ public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { /** * Tests data loading. */ - public void testLoadData() throws Exception { + public void testStream() throws Exception { try (Ignite g = startGrid()) { IgniteCache<Integer, String> cache = g.jcache(null); @@ -65,7 +65,14 @@ public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { IgniteTextSocketStreamer<Integer, String> sockStmr = new IgniteTextSocketStreamer<>(HOST, PORT, stmr, converter); - sockStmr.loadData(); + IgniteFuture<Void> fut = sockStmr.start(); + + fut.get(); + + System.out.println(">>> STATE " + sockStmr.state()); + + assertTrue(fut.isDone()); + assertFalse(fut.isCancelled()); } assertEquals(ENTRY_CNT, cache.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf9ddc5c/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java new file mode 100644 index 0000000..fd414c6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java @@ -0,0 +1,201 @@ +package org.apache.ignite.streaming; + +import junit.framework.TestCase; +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +public class StreamReceiverTest extends TestCase { + + private static final IgniteClosure<Integer, Map.Entry<Integer, String>> CONVERTER = + new IgniteClosure<Integer, Map.Entry<Integer, String>>() { + @Override public Map.Entry<Integer, String> apply(Integer input) { + return new IgniteBiTuple<>(input, input.toString()); + } + }; + + private static final IgniteDataStreamer<Integer, String> STMR = new DataStreamerStub<>(); + + private volatile boolean finished = false; + + public void testName() throws Exception { + StreamReceiver<Integer, Integer, String> receiver = + new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) { + @Override protected void loadData() { + while (!isStopped() && !finished) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }; + + assertEquals(StreamReceiver.State.INITIALIZED, receiver.state()); + assertFalse(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + IgniteFuture<Void> fut = receiver.start(); + + assertEquals(StreamReceiver.State.STARTED, receiver.state()); + + assertTrue(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + assertFalse(fut.isDone()); + assertFalse(fut.isCancelled()); + + try { + fut.get(500); + } + catch (IgniteException e) { + // No-op. + } + + assertEquals(StreamReceiver.State.STARTED, receiver.state()); + assertTrue(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + assertFalse(fut.isDone()); + assertFalse(fut.isCancelled()); + + //finished = true; + receiver.stop(); + + fut.get(); + + assertEquals(StreamReceiver.State.STOPPED, receiver.state()); + + assertFalse(receiver.isStarted()); + assertTrue(receiver.isStopped()); + + assertTrue(fut.isDone()); + assertFalse(fut.isCancelled()); + + } + + private static class DataStreamerStub<K, V> implements IgniteDataStreamer<K, V> { + + /** {@inheritDoc} */ + @Override public String cacheName() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean allowOverwrite() { + return false; + } + + /** {@inheritDoc} */ + @Override public void allowOverwrite(boolean allowOverwrite) throws IgniteException { + + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return false; + } + + /** {@inheritDoc} */ + @Override public void skipStore(boolean skipStore) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int perNodeBufferSize() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void perNodeBufferSize(int bufSize) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int perNodeParallelOperations() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void perNodeParallelOperations(int parallelOps) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long autoFlushFrequency() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void autoFlushFrequency(long autoFlushFreq) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> future() { + return null; + } + + /** {@inheritDoc} */ + @Override public void deployClass(Class<?> depCls) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void updater(Updater<K, V> updater) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> removeData(K key) throws IgniteException, IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) + throws IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public void flush() throws IgniteException, IllegalStateException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void tryFlush() throws IgniteException, IllegalStateException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void close(boolean cancel) throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + // No-op. + } + } +} \ No newline at end of file