# ignite-45 - fixing streaming.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/888f0a3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/888f0a3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/888f0a3d Branch: refs/heads/ignite-501 Commit: 888f0a3d1c9d093bc1ea471b0462b4484d34d09c Parents: 934d4e0 Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Thu Mar 19 00:40:49 2015 -0400 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Thu Mar 19 00:40:49 2015 -0400 ---------------------------------------------------------------------- examples/config/example-streamer.xml | 18 +- .../datagrid/CachePopularNumbersExample.java | 7 +- .../StreamingPopularNumbersExample.java | 113 +++++-------- .../apache/ignite/examples/ComputeExample.java | 57 ------- .../ignite/examples/MessagingExample.java | 162 ------------------ .../ignite/examples/java8/ComputeExample.java | 56 +++++++ .../examples/java8/ExampleNodeStartup.java | 35 ++++ .../ignite/examples/java8/ExamplesUtils.java | 99 +++++++++++ .../ignite/examples/java8/MessagingExample.java | 162 ++++++++++++++++++ .../StreamingPopularNumbersExample.java | 163 +++++++++++++++++++ .../examples/ScalarCacheAffinityExample.scala | 2 +- .../scalar/examples/ScalarCacheExample.scala | 2 +- .../ScalarCachePopularNumbersExample.scala | 23 +-- .../examples/ScalarCacheQueryExample.scala | 9 +- .../examples/ScalarSnowflakeSchemaExample.scala | 10 +- .../examples/MessagingExamplesSelfTest.java | 2 +- .../org/apache/ignite/IgniteDataStreamer.java | 32 +--- .../processors/cache/GridCacheAdapter.java | 5 +- .../GridDistributedCacheAdapter.java | 2 +- .../datastreamer/DataStreamProcessor.java | 3 +- .../datastreamer/DataStreamerCacheUpdaters.java | 25 +-- .../datastreamer/DataStreamerImpl.java | 33 ++-- .../datastreamer/DataStreamerRequest.java | 2 +- .../datastreamer/DataStreamerUpdateJob.java | 17 +- .../dr/IgniteDrDataStreamerCacheUpdater.java | 9 +- .../processors/igfs/IgfsDataManager.java | 3 +- .../apache/ignite/stream/StreamReceiver.java | 42 +++++ .../apache/ignite/stream/StreamTransformer.java | 41 +++++ ...cheAbstractFullApiMultithreadedSelfTest.java | 2 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 4 +- ...idCachePartitionedHitsAndMissesSelfTest.java | 7 +- .../DataStreamProcessorSelfTest.java | 12 +- .../IgniteDataStreamerPerformanceTest.java | 2 +- .../ignite/loadtests/streamer/IndexUpdater.java | 2 +- .../tests/p2p/GridExternalAffinityFunction.java | 2 +- 35 files changed, 753 insertions(+), 412 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/config/example-streamer.xml ---------------------------------------------------------------------- diff --git a/examples/config/example-streamer.xml b/examples/config/example-streamer.xml index 42790d9..e6a3a98 100644 --- a/examples/config/example-streamer.xml +++ b/examples/config/example-streamer.xml @@ -77,7 +77,7 @@ <property name="stages"> <list> - <bean class="org.apache.ignite.examples.streaming.StreamingRunningAverageExample$StreamerStage"/> + <bean class="org.apache.ignite.examples.java8.streaming.StreamingRunningAverageExample$StreamerStage"/> </list> </property> @@ -120,7 +120,7 @@ <list> <bean class="org.apache.ignite.streamer.index.tree.StreamerTreeIndexProvider"> <property name="updater"> - <bean class="org.apache.ignite.examples.streaming.StreamingPopularNumbersExample$IndexUpdater"/> + <bean class="org.apache.ignite.examples.java8.streaming.StreamingPopularNumbersExample$IndexUpdater"/> </property> </bean> </list> @@ -130,7 +130,7 @@ <property name="stages"> <list> - <bean class="org.apache.ignite.examples.streaming.StreamingPopularNumbersExample$StreamerStage"/> + <bean class="org.apache.ignite.examples.java8.streaming.StreamingPopularNumbersExample$StreamerStage"/> </list> </property> @@ -191,8 +191,8 @@ <property name="stages"> <list> - <bean class="org.apache.ignite.examples.streaming.StreamingPriceBarsExample$FirstStage"/> - <bean class="org.apache.ignite.examples.streaming.StreamingPriceBarsExample$SecondStage"/> + <bean class="org.apache.ignite.examples.java8.streaming.StreamingPriceBarsExample$FirstStage"/> + <bean class="org.apache.ignite.examples.java8.streaming.StreamingPriceBarsExample$SecondStage"/> </list> </property> @@ -232,7 +232,7 @@ <property name="unique" value="true"/> <property name="updater"> - <bean class="org.apache.ignite.examples.streaming.StreamingCheckInExample$CheckInEventIndexUpdater"/> + <bean class="org.apache.ignite.examples.java8.streaming.StreamingCheckInExample$CheckInEventIndexUpdater"/> </property> </bean> </list> @@ -248,7 +248,7 @@ <property name="unique" value="true"/> <property name="updater"> - <bean class="org.apache.ignite.examples.streaming.StreamingCheckInExample$PlacesIndexUpdater"/> + <bean class="org.apache.ignite.examples.java8.streaming.StreamingCheckInExample$PlacesIndexUpdater"/> </property> </bean> </list> @@ -259,8 +259,8 @@ <property name="stages"> <list> - <bean class="org.apache.ignite.examples.streaming.StreamingCheckInExample$AddToWindowStage"/> - <bean class="org.apache.ignite.examples.streaming.StreamingCheckInExample$DetectPlacesStage"/> + <bean class="org.apache.ignite.examples.java8.streaming.StreamingCheckInExample$AddToWindowStage"/> + <bean class="org.apache.ignite.examples.java8.streaming.StreamingCheckInExample$DetectPlacesStage"/> </list> </property> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java index 0b5fa15..47b83ac 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java @@ -23,6 +23,7 @@ import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.examples.*; +import org.apache.ignite.stream.*; import javax.cache.processor.*; import java.util.*; @@ -105,7 +106,7 @@ public class CachePopularNumbersExample { // Set larger per-node buffer size since our state is relatively small. stmr.perNodeBufferSize(2048); - stmr.updater(new IncrementingUpdater()); + stmr.receiver(new IncrementingUpdater()); for (int i = 0; i < CNT; i++) stmr.addData(RAND.nextInt(RANGE), 1L); @@ -150,7 +151,7 @@ public class CachePopularNumbersExample { /** * Increments value for key. */ - private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> { + private static class IncrementingUpdater implements StreamReceiver<Integer, Long> { /** Process entries to increase value by entry key. */ private static final EntryProcessor<Integer, Long, ?> INC = new EntryProcessor<Integer, Long, Object>() { @Override public Object process(MutableEntry<Integer, Long> e, Object... args) { @@ -163,7 +164,7 @@ public class CachePopularNumbersExample { }; /** {@inheritDoc} */ - @Override public void update(IgniteCache<Integer, Long> cache, Collection<Map.Entry<Integer, Long>> entries) { + @Override public void receive(IgniteCache<Integer, Long> cache, Collection<Map.Entry<Integer, Long>> entries) { for (Map.Entry<Integer, Long> entry : entries) cache.invoke(entry.getKey(), INC); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java index 847debe..29cc2c1 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java @@ -21,12 +21,14 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.stream.*; import javax.cache.*; import javax.cache.configuration.*; import javax.cache.expiry.*; import javax.cache.processor.*; import java.util.*; +import java.util.concurrent.*; import static java.util.concurrent.TimeUnit.*; @@ -40,30 +42,20 @@ import static java.util.concurrent.TimeUnit.*; * start node with {@code examples/config/example-compute.xml} configuration. */ public class StreamingPopularNumbersExample { - /** - * Cache name. - */ + /** Cache name. */ private static final String STREAM_NAME = StreamingPopularNumbersExample.class.getSimpleName(); - /** - * Count of most popular numbers to retrieve from cluster. - */ - private static final int POPULAR_NUMBERS_CNT = 10; - - /** - * Random number generator. - */ + /** Random number generator. */ private static final Random RAND = new Random(); - /** - * Range within which to generate numbers. - */ + /** Range within which to generate numbers. */ private static final int RANGE = 1000; - /** - * Count of total numbers to generate. - */ - private static final int CNT = 1000000; + /** Test duration. */ + private static final long DURATION = 2 * 60 * 1000; + + /** Flag indicating that the test is finished. */ + private static volatile boolean finished = false; /** * Executes example. @@ -99,36 +91,19 @@ public class StreamingPopularNumbersExample { try (IgniteCache<Integer, Long> stmCache = ignite.createCache(cfg)) { // Check that that server nodes have been started. if (ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) { - System.out.println("Ignite does not have streaming cache configured: " + STREAM_NAME); + System.err.println("Data nodes not found (start data nodes with ExampleNodeStartup class)"); return; } - // Stream random numbers from another thread. - Thread th = new Thread(new Runnable() { - @Override - public void run() { - try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(STREAM_NAME)) { - stmr.allowOverwrite(true); // Allow data updates. - stmr.updater(new IncrementingUpdater()); - - while (!Thread.currentThread().isInterrupted()) - stmr.addData(RAND.nextInt(RANGE), 1L); - } - } - }); - - th.start(); - - // Run this example for 2 minutes. - long duration = 2 * 60 * 1000; + ExecutorService exe = startStreaming(ignite); long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < duration) { + while (System.currentTimeMillis() - start < DURATION) { // Select top 10 words. SqlFieldsQuery top10 = new SqlFieldsQuery( - "select _key, _val from Long order by _val desc, _key limit ?").setArgs(10); + "select _key, _val from Long order by _val desc limit 10"); List<List<?>> results = stmCache.queryFields(top10).getAll(); @@ -140,8 +115,9 @@ public class StreamingPopularNumbersExample { Thread.sleep(5000); } - th.interrupt(); - th.join(); + finished = true; + + exe.shutdown(); } catch (CacheException e) { e.printStackTrace(); @@ -157,38 +133,35 @@ public class StreamingPopularNumbersExample { * Populates cache in real time with numbers and keeps count for every number. * * @param ignite Ignite. - * @throws org.apache.ignite.IgniteException If failed. */ - private static void streamData(final Ignite ignite) throws IgniteException { - try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(STREAM_NAME)) { - stmr.allowOverwrite(true); // Allow data updates. - stmr.updater(new IncrementingUpdater()); + private static ExecutorService startStreaming(final Ignite ignite) { + ExecutorService exe = Executors.newSingleThreadExecutor(); - while (!Thread.currentThread().isInterrupted()) - stmr.addData(RAND.nextInt(RANGE), 1L); - } - catch (IgniteInterruptedException ignore) {} - } + // Stream random numbers from another thread. + exe.submit(new Runnable() { + @Override public void run() { + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(STREAM_NAME)) { + // Allow data updates. + stmr.allowOverwrite(true); - /** - * Increments value for key. - */ - private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> { - @Override - public void update(IgniteCache<Integer, Long> cache, Collection<Map.Entry<Integer, Long>> entries) { - for (Map.Entry<Integer, Long> entry : entries) { - // Increment values by 1. - cache.invoke(entry.getKey(), new EntryProcessor<Integer, Long, Object>() { - @Override - public Object process(MutableEntry<Integer, Long> e, Object... args) { - Long val = e.getValue(); - - e.setValue(val == null ? 1L : val + 1); - - return null; - } - }); + // Transform data when processing. + stmr.receiver(new StreamTransformer<>(new EntryProcessor<Integer, Long, Object>() { + @Override + public Object process(MutableEntry<Integer, Long> e, Object... args) { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + })); + + while (!finished) + stmr.addData(RAND.nextInt(RANGE), 1L); + } } - } + }); + + return exe; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/ComputeExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/ComputeExample.java b/examples/src/main/java8/org/apache/ignite/examples/ComputeExample.java deleted file mode 100644 index f161777..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/ComputeExample.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; - -/** - * Demonstrates broadcasting and unicasting computations within cluster. - * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. - */ -public class ComputeExample { - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws IgniteException { - try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { - System.out.println(); - System.out.println(">>> Compute broadcast example started."); - - // Broadcast closure to all cluster nodes. - ignite.compute().broadcast(() -> System.out.println("Hello World")); - - // Unicast closure to some cluster node picked by load balancer. - ignite.compute().run(() -> System.out.println("Hello World")); - - // Unicast closure to some cluster node picked by load balancer and return result. - int length = ignite.compute().call("Hello World"::length); - - System.out.println(); - System.out.println(">>> Computed length: " + length); - - System.out.println(); - System.out.println(">>> Check all nodes for hello message output."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/MessagingExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/MessagingExample.java b/examples/src/main/java8/org/apache/ignite/examples/MessagingExample.java deleted file mode 100644 index 97b46a5..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/MessagingExample.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; - -import java.util.concurrent.*; - -/** - * Example that demonstrates how to exchange messages between nodes. Use such - * functionality for cases when you need to communicate to other nodes outside - * of ignite task. - * <p> - * To run this example you must have at least one remote node started. - * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. - * <p> - * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node - * with {@code examples/config/example-compute.xml} configuration. - */ -public final class MessagingExample { - /** Number of messages. */ - private static final int MESSAGES_NUM = 10; - - /** Message topics. */ - private enum TOPIC { ORDERED, UNORDERED } - - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws Exception { - try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { - if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2)) { - System.out.println(); - System.out.println(">>> Please start at least 2 cluster nodes to run example."); - System.out.println(); - - return; - } - - System.out.println(); - System.out.println(">>> Messaging example started."); - - // Group for remote nodes. - ClusterGroup rmtGrp = ignite.cluster().forRemotes(); - - // Listen for messages from remote nodes to make sure that they received all the messages. - int msgCnt = rmtGrp.nodes().size() * MESSAGES_NUM; - - CountDownLatch orderedLatch = new CountDownLatch(msgCnt); - CountDownLatch unorderedLatch = new CountDownLatch(msgCnt); - - localListen(ignite.message(ignite.cluster().forLocal()), orderedLatch, unorderedLatch); - - // Register listeners on all cluster nodes. - startListening(ignite, ignite.message(rmtGrp)); - - // Send unordered messages to all remote nodes. - for (int i = 0; i < MESSAGES_NUM; i++) - ignite.message(rmtGrp).send(TOPIC.UNORDERED, Integer.toString(i)); - - System.out.println(">>> Finished sending unordered messages."); - - // Send ordered messages to all remote nodes. - for (int i = 0; i < MESSAGES_NUM; i++) - ignite.message(rmtGrp).sendOrdered(TOPIC.ORDERED, Integer.toString(i), 0); - - System.out.println(">>> Finished sending ordered messages."); - System.out.println(">>> Check output on all nodes for message printouts."); - System.out.println(">>> Will wait for messages acknowledgements from all remote nodes."); - - orderedLatch.await(); - unorderedLatch.await(); - - System.out.println(">>> Messaging example finished."); - } - } - - /** - * Start listening to messages on remote cluster nodes. - * - * @param ignite Ignite. - * @param imsg Ignite messaging. - * @throws IgniteException If failed. - */ - private static void startListening(final Ignite ignite, IgniteMessaging imsg) throws IgniteException { - // Add ordered message listener. - imsg.remoteListen(TOPIC.ORDERED, (nodeId, msg) -> { - System.out.println("Received ordered message [msg=" + msg + ", fromNodeId=" + nodeId + ']'); - - try { - ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg); - } - catch (IgniteException e) { - e.printStackTrace(); - } - - return true; // Return true to continue listening. - }); - - // Add unordered message listener. - imsg.remoteListen(TOPIC.UNORDERED, (nodeId, msg) -> { - System.out.println("Received unordered message [msg=" + msg + ", fromNodeId=" + nodeId + ']'); - - try { - ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.UNORDERED, msg); - } - catch (IgniteException e) { - e.printStackTrace(); - } - - return true; // Return true to continue listening. - }); - } - - /** - * Listen for messages from remote nodes. - * - * @param imsg Ignite messaging. - * @param orderedLatch Latch for ordered messages acks. - * @param unorderedLatch Latch for unordered messages acks. - */ - private static void localListen( - IgniteMessaging imsg, - final CountDownLatch orderedLatch, - final CountDownLatch unorderedLatch - ) { - imsg.localListen(TOPIC.ORDERED, (nodeId, msg) -> { - orderedLatch.countDown(); - - // Return true to continue listening, false to stop. - return orderedLatch.getCount() > 0; - }); - - imsg.localListen(TOPIC.UNORDERED, (nodeId, msg) -> { - unorderedLatch.countDown(); - - // Return true to continue listening, false to stop. - return unorderedLatch.getCount() > 0; - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/ComputeExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/ComputeExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/ComputeExample.java new file mode 100644 index 0000000..557d65c --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/ComputeExample.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8; + +import org.apache.ignite.*; + +/** + * Demonstrates broadcasting and unicasting computations within cluster. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. + */ +public class ComputeExample { + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { + System.out.println(); + System.out.println(">>> Compute broadcast example started."); + + // Broadcast closure to all cluster nodes. + ignite.compute().broadcast(() -> System.out.println("Hello World")); + + // Unicast closure to some cluster node picked by load balancer. + ignite.compute().run(() -> System.out.println("Hello World")); + + // Unicast closure to some cluster node picked by load balancer and return result. + int length = ignite.compute().call("Hello World"::length); + + System.out.println(); + System.out.println(">>> Computed length: " + length); + + System.out.println(); + System.out.println(">>> Check all nodes for hello message output."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/ExampleNodeStartup.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/ExampleNodeStartup.java b/examples/src/main/java8/org/apache/ignite/examples/java8/ExampleNodeStartup.java new file mode 100644 index 0000000..6f5ba93 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/ExampleNodeStartup.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8; + +import org.apache.ignite.*; + +/** + * Starts up an empty node with example compute configuration. + */ +public class ExampleNodeStartup { + /** + * Start up an empty node with example compute configuration. + * + * @param args Command line arguments, none required. + * @throws IgniteException If failed. + */ + public static void main(String[] args) throws IgniteException { + Ignition.start("examples/config/example-compute.xml"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java new file mode 100644 index 0000000..5b62bd4 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.streamer.*; + +import java.net.*; + +/** + * + */ +public class ExamplesUtils { + /** */ + private static final ClassLoader CLS_LDR = ExamplesUtils.class.getClassLoader(); + + /** + * Exits with code {@code -1} if maximum memory is below 90% of minimally allowed threshold. + * + * @param min Minimum memory threshold. + */ + public static void checkMinMemory(long min) { + long maxMem = Runtime.getRuntime().maxMemory(); + + if (maxMem < .85 * min) { + System.err.println("Heap limit is too low (" + (maxMem / (1024 * 1024)) + + "MB), please increase heap size at least up to " + (min / (1024 * 1024)) + "MB."); + + System.exit(-1); + } + } + + /** + * Returns URL resolved by class loader for classes in examples project. + * + * @return Resolved URL. + */ + public static URL url(String path) { + URL url = CLS_LDR.getResource(path); + + if (url == null) + throw new RuntimeException("Failed to resolve resource URL by path: " + path); + + return url; + } + + /** + * Checks minimum topology size for running a certain example. + * + * @param prj Cluster to check size for. + * @param size Minimum number of nodes required to run a certain example. + * @return {@code True} if check passed, {@code false} otherwise. + */ + public static boolean checkMinTopologySize(ClusterGroup prj, int size) { + int prjSize = prj.nodes().size(); + + if (prjSize < size) { + System.out.println(); + System.out.println(">>> Please start at least " + size + " cluster nodes to run example."); + System.out.println(); + + return false; + } + + return true; + } + + /** + * @param ignite Ignite. + * @param name Streamer name. + * @return {@code True} if ignite has streamer with given name. + */ + public static boolean hasStreamer(Ignite ignite, String name) { + if (ignite.configuration().getStreamerConfiguration() != null) { + for (StreamerConfiguration cfg : ignite.configuration().getStreamerConfiguration()) { + if (name.equals(cfg.getName())) + return true; + } + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/MessagingExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/MessagingExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/MessagingExample.java new file mode 100644 index 0000000..621c6d4 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/MessagingExample.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; + +import java.util.concurrent.*; + +/** + * Example that demonstrates how to exchange messages between nodes. Use such + * functionality for cases when you need to communicate to other nodes outside + * of ignite task. + * <p> + * To run this example you must have at least one remote node started. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node + * with {@code examples/config/example-compute.xml} configuration. + */ +public final class MessagingExample { + /** Number of messages. */ + private static final int MESSAGES_NUM = 10; + + /** Message topics. */ + private enum TOPIC { ORDERED, UNORDERED } + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { + if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2)) { + System.out.println(); + System.out.println(">>> Please start at least 2 cluster nodes to run example."); + System.out.println(); + + return; + } + + System.out.println(); + System.out.println(">>> Messaging example started."); + + // Group for remote nodes. + ClusterGroup rmtGrp = ignite.cluster().forRemotes(); + + // Listen for messages from remote nodes to make sure that they received all the messages. + int msgCnt = rmtGrp.nodes().size() * MESSAGES_NUM; + + CountDownLatch orderedLatch = new CountDownLatch(msgCnt); + CountDownLatch unorderedLatch = new CountDownLatch(msgCnt); + + localListen(ignite.message(ignite.cluster().forLocal()), orderedLatch, unorderedLatch); + + // Register listeners on all cluster nodes. + startListening(ignite, ignite.message(rmtGrp)); + + // Send unordered messages to all remote nodes. + for (int i = 0; i < MESSAGES_NUM; i++) + ignite.message(rmtGrp).send(TOPIC.UNORDERED, Integer.toString(i)); + + System.out.println(">>> Finished sending unordered messages."); + + // Send ordered messages to all remote nodes. + for (int i = 0; i < MESSAGES_NUM; i++) + ignite.message(rmtGrp).sendOrdered(TOPIC.ORDERED, Integer.toString(i), 0); + + System.out.println(">>> Finished sending ordered messages."); + System.out.println(">>> Check output on all nodes for message printouts."); + System.out.println(">>> Will wait for messages acknowledgements from all remote nodes."); + + orderedLatch.await(); + unorderedLatch.await(); + + System.out.println(">>> Messaging example finished."); + } + } + + /** + * Start listening to messages on remote cluster nodes. + * + * @param ignite Ignite. + * @param imsg Ignite messaging. + * @throws IgniteException If failed. + */ + private static void startListening(final Ignite ignite, IgniteMessaging imsg) throws IgniteException { + // Add ordered message listener. + imsg.remoteListen(TOPIC.ORDERED, (nodeId, msg) -> { + System.out.println("Received ordered message [msg=" + msg + ", fromNodeId=" + nodeId + ']'); + + try { + ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg); + } + catch (IgniteException e) { + e.printStackTrace(); + } + + return true; // Return true to continue listening. + }); + + // Add unordered message listener. + imsg.remoteListen(TOPIC.UNORDERED, (nodeId, msg) -> { + System.out.println("Received unordered message [msg=" + msg + ", fromNodeId=" + nodeId + ']'); + + try { + ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.UNORDERED, msg); + } + catch (IgniteException e) { + e.printStackTrace(); + } + + return true; // Return true to continue listening. + }); + } + + /** + * Listen for messages from remote nodes. + * + * @param imsg Ignite messaging. + * @param orderedLatch Latch for ordered messages acks. + * @param unorderedLatch Latch for unordered messages acks. + */ + private static void localListen( + IgniteMessaging imsg, + final CountDownLatch orderedLatch, + final CountDownLatch unorderedLatch + ) { + imsg.localListen(TOPIC.ORDERED, (nodeId, msg) -> { + orderedLatch.countDown(); + + // Return true to continue listening, false to stop. + return orderedLatch.getCount() > 0; + }); + + imsg.localListen(TOPIC.UNORDERED, (nodeId, msg) -> { + unorderedLatch.countDown(); + + // Return true to continue listening, false to stop. + return unorderedLatch.getCount() > 0; + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java new file mode 100644 index 0000000..3b33402 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.java8.*; +import org.apache.ignite.stream.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.expiry.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; + +/** + * Real time popular numbers counter. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-compute.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-compute.xml} configuration. + */ +public class StreamingPopularNumbersExample { + /** Cache name. */ + private static final String STREAM_NAME = StreamingPopularNumbersExample.class.getSimpleName(); + + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + /** Test duration. */ + private static final long DURATION = 2 * 60 * 1000; + + /** Flag indicating that the test is finished. */ + private static volatile boolean finished = false; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-compute.xml")) { + System.out.println(); + System.out.println(">>> Cache popular numbers example started."); + + /* + * Configure streaming cache. + * ========================= + */ + CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(); + + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setName(STREAM_NAME); + cfg.setIndexedTypes(Integer.class, Long.class); + + // Sliding window of 1 seconds. + cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 1)))); + + /** + * Start the streaming cache on all server nodes. + * ============================================ + */ + try (IgniteCache<Integer, Long> stmCache = ignite.createCache(cfg)) { + // Check that that server nodes have been started. + if (ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) { + System.err.println("Data nodes not found (start data nodes with ExampleNodeStartup class)"); + + return; + } + + ExecutorService exe = startStreaming(ignite); + + long start = System.currentTimeMillis(); + + while (System.currentTimeMillis() - start < DURATION) { + // Select top 10 words. + SqlFieldsQuery top10 = new SqlFieldsQuery( + "select _key, _val from Long order by _val desc limit 10"); + + List<List<?>> results = stmCache.queryFields(top10).getAll(); + + for (List<?> res : results) + System.out.println(res.get(0) + "=" + res.get(1)); + + System.out.println("----------------"); + + Thread.sleep(5000); + } + + finished = true; + + exe.shutdown(); + } + catch (CacheException e) { + e.printStackTrace(); + + System.out.println("Destroying cache for name '" + STREAM_NAME + "'. Please try again."); + + ignite.destroyCache(STREAM_NAME); + } + } + } + + /** + * Populates the streaming cache in real time with numbers and keeps count for every number. + * + * @param ignite Ignite. + */ + private static ExecutorService startStreaming(final Ignite ignite) { + ExecutorService exe = Executors.newSingleThreadExecutor(); + + // Stream random numbers from another thread. + exe.submit(() -> { + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(STREAM_NAME)) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<>((e, args) -> { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + })); + + + while (!finished) + stmr.addData(RAND.nextInt(RANGE), 1L); + } + }); + + return exe; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala index 53033a0..5ff03de 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala @@ -18,7 +18,7 @@ package org.apache.ignite.scalar.examples import org.apache.ignite.IgniteCache -import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.examples.java8.ExampleNodeStartup import org.apache.ignite.scalar.scalar import org.apache.ignite.scalar.scalar._ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala index 4c73a8a..f6e47a1 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala @@ -19,7 +19,7 @@ package org.apache.ignite.scalar.examples import org.apache.ignite.events.Event import org.apache.ignite.events.EventType._ -import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.examples.java8.ExampleNodeStartup import org.apache.ignite.lang.IgnitePredicate import org.apache.ignite.scalar.scalar import org.apache.ignite.scalar.scalar._ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala index f00dec7..0acdf42 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala @@ -17,18 +17,19 @@ package org.apache.ignite.scalar.examples -import org.apache.ignite.cache.query.SqlFieldsQuery -import org.apache.ignite.examples.ExampleNodeStartup -import org.apache.ignite.internal.util.scala.impl -import org.apache.ignite.scalar.scalar -import org.apache.ignite.scalar.scalar._ -import org.apache.ignite.{IgniteCache, IgniteDataStreamer, IgniteException} - -import javax.cache.processor.{EntryProcessor, MutableEntry} import java.lang.{Integer => JavaInt, Long => JavaLong} import java.util import java.util.Map.Entry import java.util.Timer +import javax.cache.processor.{EntryProcessor, MutableEntry} + +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.examples.java8.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.stream.StreamReceiver +import org.apache.ignite.{IgniteCache, IgniteException} import scala.collection.JavaConversions._ import scala.util.Random @@ -107,7 +108,7 @@ object ScalarCachePopularNumbersExample extends App { // Reduce parallel operations since we running the whole ignite cluster locally under heavy load. val smtr = dataStreamer$[JavaInt, JavaLong](NAME, 2048) - smtr.updater(new IncrementingUpdater()) + smtr.receiver(new IncrementingUpdater()) (0 until CNT) foreach (_ => smtr.addData(RAND.nextInt(RANGE), 1L)) @@ -132,7 +133,7 @@ object ScalarCachePopularNumbersExample extends App { /** * Increments value for key. */ - private class IncrementingUpdater extends IgniteDataStreamer.Updater[JavaInt, JavaLong] { + private class IncrementingUpdater extends StreamReceiver[JavaInt, JavaLong] { private[this] final val INC = new EntryProcessor[JavaInt, JavaLong, Object]() { /** Process entries to increase value by entry key. */ override def process(e: MutableEntry[JavaInt, JavaLong], args: AnyRef*): Object = { @@ -144,7 +145,7 @@ object ScalarCachePopularNumbersExample extends App { } } - @impl def update(cache: IgniteCache[JavaInt, JavaLong], entries: util.Collection[Entry[JavaInt, JavaLong]]) { + @impl def receive(cache: IgniteCache[JavaInt, JavaLong], entries: util.Collection[Entry[JavaInt, JavaLong]]) { entries.foreach(entry => cache.invoke(entry.getKey, INC)) } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala index f5c13e9..27ac9ed 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala @@ -17,18 +17,17 @@ package org.apache.ignite.scalar.examples -import org.apache.ignite.configuration.CacheConfiguration - import java.util._ -import org.apache.ignite.{IgniteCache, Ignite} -import org.apache.ignite.examples.ExampleNodeStartup import org.apache.ignite.cache.CacheMode._ import org.apache.ignite.cache.affinity.CacheAffinityKey +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.examples.java8.ExampleNodeStartup import org.apache.ignite.scalar.scalar import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{Ignite, IgniteCache} -import collection.JavaConversions._ +import scala.collection.JavaConversions._ /** * Demonstrates cache ad-hoc queries with Scalar. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala index 75087ba..8a0ae5c 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala @@ -17,17 +17,17 @@ package org.apache.ignite.scalar.examples +import java.lang.{Integer => JavaInt} +import java.util.ConcurrentModificationException +import javax.cache.Cache + import org.apache.ignite.IgniteCache -import org.apache.ignite.examples.ExampleNodeStartup import org.apache.ignite.cache.CacheMode +import org.apache.ignite.examples.java8.ExampleNodeStartup import org.apache.ignite.scalar.scalar import org.apache.ignite.scalar.scalar._ import org.jdk8.backport.ThreadLocalRandom8 -import javax.cache.Cache -import java.lang.{Integer => JavaInt} -import java.util.ConcurrentModificationException - import scala.collection.JavaConversions._ /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java index e85ac2f..0932b47 100644 --- a/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java +++ b/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java @@ -46,7 +46,7 @@ public class MessagingExamplesSelfTest extends GridAbstractExamplesTest { /** * @throws Exception If failed. */ - public void testessagingPingPongListenActorExample() throws Exception { + public void testMessagingPingPongListenActorExample() throws Exception { MessagingPingPongListenActorExample.main(EMPTY_ARGS); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index c6f28bf..ddfd7a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -18,9 +18,9 @@ package org.apache.ignite; import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; /** @@ -113,7 +113,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { /** * Sets flag indicating that this data streamer should assume * that there are no other concurrent updates to the cache. - * Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method. + * Should not be used when custom cache receiver set using {@link #updater(IgniteDataStreamer.Updater)} method. * Default is {@code false}. When this flag is set, updates will not be propagated to the cache store. * * @param allowOverwrite Flag value. @@ -156,14 +156,14 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public void perNodeBufferSize(int bufSize); /** - * Gets maximum number of parallel update operations for a single node. + * Gets maximum number of parallel stream operations for a single node. * * @return Maximum number of parallel stream operations for a single node. */ public int perNodeParallelOperations(); /** - * Sets maximum number of parallel update operations for a single node. + * Sets maximum number of parallel stream operations for a single node. * <p> * This method should be called prior to {@link #addData(Object, Object)} call. * <p> @@ -226,11 +226,11 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public void deployClass(Class<?> depCls); /** - * Sets custom cache updater to this data streamer. + * Sets custom stream receiver to this data streamer. * - * @param updater Cache updater. + * @param updater Stream receiver. */ - public void updater(Updater<K, V> updater); + public void receiver(StreamReceiver<K, V> updater); /** * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}. @@ -380,22 +380,4 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { */ @Override public void close() throws IgniteException, IgniteInterruptedException; - /** - * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataStreamer#allowOverwrite(boolean)} - * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best - * performance custom user-defined implementation may help. - * <p> - * Data streamer can be configured to use custom implementation of updater instead of default one using - * {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method. - */ - interface Updater<K, V> extends Serializable { - /** - * Updates cache with batch of entries. - * - * @param cache Cache. - * @param entries Collection of entries. - * @throws IgniteException If failed. - */ - public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 4e4ce6a..9c26382 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -22,7 +22,6 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.compute.*; @@ -3827,7 +3826,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex()); try { - ldr.updater(new IgniteDrDataStreamerCacheUpdater()); + ldr.receiver(new IgniteDrDataStreamerCacheUpdater()); LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc); @@ -4021,7 +4020,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex()); try { - ldr.updater(new IgniteDrDataStreamerCacheUpdater()); + ldr.receiver(new IgniteDrDataStreamerCacheUpdater()); LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 2172d7a..132e584 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -288,7 +288,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter (DataStreamerImpl)ignite.dataStreamer(cacheName)) { ((DataStreamerImpl)dataLdr).maxRemapCount(0); - dataLdr.updater(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched()); + dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched()); for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { if (!locPart.isEmpty() && locPart.primary(topVer)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 85624d4..3a2936f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.marshaller.*; +import org.apache.ignite.stream.*; import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; @@ -224,7 +225,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { clsLdr = dep.classLoader(); } - IgniteDataStreamer.Updater<K, V> updater; + StreamReceiver<K, V> updater; try { updater = marsh.unmarshal(req.updaterBytes(), clsLdr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java index 54061b0..7f6bb5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.datastreamer; import org.apache.ignite.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.stream.*; import org.jetbrains.annotations.*; import java.util.*; @@ -29,13 +30,13 @@ import java.util.*; */ public class DataStreamerCacheUpdaters { /** */ - private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual(); + private static final StreamReceiver INDIVIDUAL = new Individual(); /** */ - private static final IgniteDataStreamer.Updater BATCHED = new Batched(); + private static final StreamReceiver BATCHED = new Batched(); /** */ - private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted(); + private static final StreamReceiver BATCHED_SORTED = new BatchedSorted(); /** * Updates cache using independent {@link org.apache.ignite.internal.processors.cache.GridCache#put(Object, Object, CacheEntryPredicate[])} and @@ -44,7 +45,7 @@ public class DataStreamerCacheUpdaters { * * @return Single updater. */ - public static <K, V> IgniteDataStreamer.Updater<K, V> individual() { + public static <K, V> StreamReceiver<K, V> individual() { return INDIVIDUAL; } @@ -55,7 +56,7 @@ public class DataStreamerCacheUpdaters { * * @return Batched updater. */ - public static <K, V> IgniteDataStreamer.Updater<K, V> batched() { + public static <K, V> StreamReceiver<K, V> batched() { return BATCHED; } @@ -66,7 +67,7 @@ public class DataStreamerCacheUpdaters { * * @return Batched sorted updater. */ - public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() { + public static <K extends Comparable<?>, V> StreamReceiver<K, V> batchedSorted() { return BATCHED_SORTED; } @@ -93,12 +94,12 @@ public class DataStreamerCacheUpdaters { /** * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone. */ - private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V>, InternalUpdater { + private static class Individual<K, V> implements StreamReceiver<K, V>, InternalUpdater { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { assert cache != null; assert !F.isEmpty(entries); @@ -120,12 +121,12 @@ public class DataStreamerCacheUpdaters { /** * Batched updater. Updates cache using batch operations thus is dead lock prone. */ - private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V>, InternalUpdater { + private static class Batched<K, V> implements StreamReceiver<K, V>, InternalUpdater { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { assert cache != null; assert !F.isEmpty(entries); @@ -160,12 +161,12 @@ public class DataStreamerCacheUpdaters { /** * Batched updater. Updates cache using batch operations thus is dead lock prone. */ - private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V>, InternalUpdater { + private static class BatchedSorted<K, V> implements StreamReceiver<K, V>, InternalUpdater { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { assert cache != null; assert !F.isEmpty(entries); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index dde98c6..05e7b44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -55,11 +56,11 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; */ @SuppressWarnings("unchecked") public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { - /** Isolated updater. */ - private static final Updater ISOLATED_UPDATER = new IsolatedUpdater(); + /** Isolated receiver. */ + private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater(); - /** Cache updater. */ - private Updater<K, V> updater = ISOLATED_UPDATER; + /** Cache receiver. */ + private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER; /** */ private byte[] updaterBytes; @@ -286,15 +287,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** {@inheritDoc} */ - @Override public void updater(Updater<K, V> updater) { - A.notNull(updater, "updater"); + @Override public void receiver(StreamReceiver<K, V> rcvr) { + A.notNull(rcvr, "rcvr"); - this.updater = updater; + this.rcvr = rcvr; } /** {@inheritDoc} */ @Override public boolean allowOverwrite() { - return updater != ISOLATED_UPDATER; + return rcvr != ISOLATED_UPDATER; } /** {@inheritDoc} */ @@ -307,7 +308,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (node == null) throw new IgniteException("Failed to get node for cache: " + cacheName); - updater = allow ? DataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER; + rcvr = allow ? DataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER; } /** {@inheritDoc} */ @@ -537,7 +538,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (initPda) { jobPda = new DataStreamerPda(key.value(cacheObjCtx, false), entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null, - updater); + rcvr); initPda = false; } @@ -1068,7 +1069,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (isLocNode) { fut = ctx.closure().callLocalSafe( - new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, updater), false); + new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, rcvr), false); locFuts.add(fut); @@ -1099,9 +1100,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } if (updaterBytes == null) { - assert updater != null; + assert rcvr != null; - updaterBytes = ctx.config().getMarshaller().marshal(updater); + updaterBytes = ctx.config().getMarshaller().marshal(rcvr); } if (topicBytes == null) @@ -1348,15 +1349,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** - * Isolated updater which only loads entry initial value. + * Isolated receiver which only loads entry initial value. */ - private static class IsolatedUpdater implements Updater<KeyCacheObject, CacheObject>, + private static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>, DataStreamerCacheUpdaters.InternalUpdater { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(IgniteCache<KeyCacheObject, CacheObject> cache, + @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache, Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) { IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java index 26f4965..a216ffe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -88,7 +88,7 @@ public class DataStreamerRequest implements Message { * @param reqId Request ID. * @param resTopicBytes Response topic. * @param cacheName Cache name. - * @param updaterBytes Cache updater. + * @param updaterBytes Cache receiver. * @param entries Entries to put. * @param ignoreDepOwnership Ignore ownership. * @param skipStore Skip store flag. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java index 1faf22d..c984558 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.stream.*; import org.jetbrains.annotations.*; import java.util.*; @@ -49,7 +50,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { private final boolean skipStore; /** */ - private final IgniteDataStreamer.Updater updater; + private final StreamReceiver rcvr; /** * @param ctx Context. @@ -58,7 +59,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { * @param col Entries to put. * @param ignoreDepOwnership {@code True} to ignore deployment ownership. * @param skipStore Skip store flag. - * @param updater Updater. + * @param rcvr Updater. */ DataStreamerUpdateJob( GridKernalContext ctx, @@ -67,18 +68,18 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { Collection<DataStreamerEntry> col, boolean ignoreDepOwnership, boolean skipStore, - IgniteDataStreamer.Updater<?, ?> updater) { + StreamReceiver<?, ?> rcvr) { this.ctx = ctx; this.log = log; assert col != null && !col.isEmpty(); - assert updater != null; + assert rcvr != null; this.cacheName = cacheName; this.col = col; this.ignoreDepOwnership = ignoreDepOwnership; this.skipStore = skipStore; - this.updater = updater; + this.rcvr = rcvr; } /** {@inheritDoc} */ @@ -125,10 +126,10 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { } }); - updater.update(cache, col0); + rcvr.receive(cache, col0); } else - updater.update(cache, col); + rcvr.receive(cache, col); return null; } @@ -145,6 +146,6 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { * @return {@code True} if need to unwrap internal entries. */ private boolean unwrapEntries() { - return !(updater instanceof DataStreamerCacheUpdaters.InternalUpdater); + return !(rcvr instanceof DataStreamerCacheUpdaters.InternalUpdater); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java index af8b088..82ad62b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java @@ -26,19 +26,20 @@ import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.stream.*; import java.util.*; /** - * Data center replication cache updater for data streamer. + * Data center replication cache receiver for data streamer. */ -public class IgniteDrDataStreamerCacheUpdater implements IgniteDataStreamer.Updater<KeyCacheObject, CacheObject>, +public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCacheObject, CacheObject>, DataStreamerCacheUpdaters.InternalUpdater { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(IgniteCache<KeyCacheObject, CacheObject> cache0, + @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache0, Collection<Map.Entry<KeyCacheObject, CacheObject>> col) { try { String cacheName = cache0.getConfiguration(CacheConfiguration.class).getName(); @@ -66,7 +67,7 @@ public class IgniteDrDataStreamerCacheUpdater implements IgniteDataStreamer.Upda KeyCacheObject key = entry.getKey(); - // Ensure that updater to not receive special-purpose values for TTL and expire time. + // Ensure that receiver to not receive special-purpose values for TTL and expire time. assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != CU.TTL_ZERO && entry.ttl() >= 0; assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && entry.expireTime() >= 0; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index d6db20f..67a4b1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; @@ -315,7 +314,7 @@ public class IgfsDataManager extends IgfsManager { if (cfg.getPerNodeParallelBatchCount() > 0) ldr.perNodeParallelOperations(cfg.getPerNodeParallelBatchCount()); - ldr.updater(DataStreamerCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted()); + ldr.receiver(DataStreamerCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted()); return ldr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/stream/StreamReceiver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamReceiver.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamReceiver.java new file mode 100644 index 0000000..a3d9dfb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamReceiver.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream; + +import org.apache.ignite.*; + +import java.io.*; +import java.util.*; + +/** + * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataStreamer#allowOverwrite(boolean)} + * property and appropriate internal cache receiver will be chosen automatically. But in some cases to achieve best + * performance custom user-defined implementation may help. + * <p> + * Data streamer can be configured to use custom implementation of the receiver instead of default one using + * {@link IgniteDataStreamer#receiver(StreamReceiver)} method. + */ +public interface StreamReceiver<K, V> extends Serializable { + /** + * Updates cache with batch of entries. + * + * @param cache Cache. + * @param entries Collection of entries. + * @throws org.apache.ignite.IgniteException If failed. + */ + public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java new file mode 100644 index 0000000..4ff9a59 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; + +import javax.cache.processor.*; +import java.util.*; + +/** + * Created by Dmitriy on 3/18/15. + */ +public class StreamTransformer<K, V> implements StreamReceiver<K, V> { + private EntryProcessor<K, V, Object> ep; + + public StreamTransformer(CacheEntryProcessor<K, V, Object> ep) { + this.ep = ep; + } + + /** {@inheritDoc} */ + @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException { + for (Map.Entry<K, V> entry : entries) + cache.invoke(entry.getKey(), ep); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java index 3a105cc..41c2288 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java @@ -188,7 +188,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid cacheAsync.get("key" + rnd); - assert cacheAsync.future().get() == rnd; + assert cacheAsync.<Integer>future().get() == rnd; cache.get("wrongKey");