# ignite-45 - fixing streaming.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/888f0a3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/888f0a3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/888f0a3d

Branch: refs/heads/ignite-501
Commit: 888f0a3d1c9d093bc1ea471b0462b4484d34d09c
Parents: 934d4e0
Author: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Authored: Thu Mar 19 00:40:49 2015 -0400
Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com>
Committed: Thu Mar 19 00:40:49 2015 -0400

----------------------------------------------------------------------
 examples/config/example-streamer.xml            |  18 +-
 .../datagrid/CachePopularNumbersExample.java    |   7 +-
 .../StreamingPopularNumbersExample.java         | 113 +++++--------
 .../apache/ignite/examples/ComputeExample.java  |  57 -------
 .../ignite/examples/MessagingExample.java       | 162 ------------------
 .../ignite/examples/java8/ComputeExample.java   |  56 +++++++
 .../examples/java8/ExampleNodeStartup.java      |  35 ++++
 .../ignite/examples/java8/ExamplesUtils.java    |  99 +++++++++++
 .../ignite/examples/java8/MessagingExample.java | 162 ++++++++++++++++++
 .../StreamingPopularNumbersExample.java         | 163 +++++++++++++++++++
 .../examples/ScalarCacheAffinityExample.scala   |   2 +-
 .../scalar/examples/ScalarCacheExample.scala    |   2 +-
 .../ScalarCachePopularNumbersExample.scala      |  23 +--
 .../examples/ScalarCacheQueryExample.scala      |   9 +-
 .../examples/ScalarSnowflakeSchemaExample.scala |  10 +-
 .../examples/MessagingExamplesSelfTest.java     |   2 +-
 .../org/apache/ignite/IgniteDataStreamer.java   |  32 +---
 .../processors/cache/GridCacheAdapter.java      |   5 +-
 .../GridDistributedCacheAdapter.java            |   2 +-
 .../datastreamer/DataStreamProcessor.java       |   3 +-
 .../datastreamer/DataStreamerCacheUpdaters.java |  25 +--
 .../datastreamer/DataStreamerImpl.java          |  33 ++--
 .../datastreamer/DataStreamerRequest.java       |   2 +-
 .../datastreamer/DataStreamerUpdateJob.java     |  17 +-
 .../dr/IgniteDrDataStreamerCacheUpdater.java    |   9 +-
 .../processors/igfs/IgfsDataManager.java        |   3 +-
 .../apache/ignite/stream/StreamReceiver.java    |  42 +++++
 .../apache/ignite/stream/StreamTransformer.java |  41 +++++
 ...cheAbstractFullApiMultithreadedSelfTest.java |   2 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |   4 +-
 ...idCachePartitionedHitsAndMissesSelfTest.java |   7 +-
 .../DataStreamProcessorSelfTest.java            |  12 +-
 .../IgniteDataStreamerPerformanceTest.java      |   2 +-
 .../ignite/loadtests/streamer/IndexUpdater.java |   2 +-
 .../tests/p2p/GridExternalAffinityFunction.java |   2 +-
 35 files changed, 753 insertions(+), 412 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/config/example-streamer.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-streamer.xml 
b/examples/config/example-streamer.xml
index 42790d9..e6a3a98 100644
--- a/examples/config/example-streamer.xml
+++ b/examples/config/example-streamer.xml
@@ -77,7 +77,7 @@
 
                     <property name="stages">
                         <list>
-                            <bean 
class="org.apache.ignite.examples.streaming.StreamingRunningAverageExample$StreamerStage"/>
+                            <bean 
class="org.apache.ignite.examples.java8.streaming.StreamingRunningAverageExample$StreamerStage"/>
                         </list>
                     </property>
 
@@ -120,7 +120,7 @@
                                 <list>
                                     <bean 
class="org.apache.ignite.streamer.index.tree.StreamerTreeIndexProvider">
                                         <property name="updater">
-                                            <bean 
class="org.apache.ignite.examples.streaming.StreamingPopularNumbersExample$IndexUpdater"/>
+                                            <bean 
class="org.apache.ignite.examples.java8.streaming.StreamingPopularNumbersExample$IndexUpdater"/>
                                         </property>
                                     </bean>
                                 </list>
@@ -130,7 +130,7 @@
 
                     <property name="stages">
                         <list>
-                            <bean 
class="org.apache.ignite.examples.streaming.StreamingPopularNumbersExample$StreamerStage"/>
+                            <bean 
class="org.apache.ignite.examples.java8.streaming.StreamingPopularNumbersExample$StreamerStage"/>
                         </list>
                     </property>
 
@@ -191,8 +191,8 @@
 
                     <property name="stages">
                         <list>
-                            <bean 
class="org.apache.ignite.examples.streaming.StreamingPriceBarsExample$FirstStage"/>
-                            <bean 
class="org.apache.ignite.examples.streaming.StreamingPriceBarsExample$SecondStage"/>
+                            <bean 
class="org.apache.ignite.examples.java8.streaming.StreamingPriceBarsExample$FirstStage"/>
+                            <bean 
class="org.apache.ignite.examples.java8.streaming.StreamingPriceBarsExample$SecondStage"/>
                         </list>
                     </property>
 
@@ -232,7 +232,7 @@
                                             <property name="unique" 
value="true"/>
 
                                             <property name="updater">
-                                                <bean 
class="org.apache.ignite.examples.streaming.StreamingCheckInExample$CheckInEventIndexUpdater"/>
+                                                <bean 
class="org.apache.ignite.examples.java8.streaming.StreamingCheckInExample$CheckInEventIndexUpdater"/>
                                             </property>
                                         </bean>
                                     </list>
@@ -248,7 +248,7 @@
                                             <property name="unique" 
value="true"/>
 
                                             <property name="updater">
-                                                <bean 
class="org.apache.ignite.examples.streaming.StreamingCheckInExample$PlacesIndexUpdater"/>
+                                                <bean 
class="org.apache.ignite.examples.java8.streaming.StreamingCheckInExample$PlacesIndexUpdater"/>
                                             </property>
                                         </bean>
                                     </list>
@@ -259,8 +259,8 @@
 
                     <property name="stages">
                         <list>
-                            <bean 
class="org.apache.ignite.examples.streaming.StreamingCheckInExample$AddToWindowStage"/>
-                            <bean 
class="org.apache.ignite.examples.streaming.StreamingCheckInExample$DetectPlacesStage"/>
+                            <bean 
class="org.apache.ignite.examples.java8.streaming.StreamingCheckInExample$AddToWindowStage"/>
+                            <bean 
class="org.apache.ignite.examples.java8.streaming.StreamingCheckInExample$DetectPlacesStage"/>
                         </list>
                     </property>
                 </bean>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
index 0b5fa15..47b83ac 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.examples.*;
+import org.apache.ignite.stream.*;
 
 import javax.cache.processor.*;
 import java.util.*;
@@ -105,7 +106,7 @@ public class CachePopularNumbersExample {
             // Set larger per-node buffer size since our state is relatively 
small.
             stmr.perNodeBufferSize(2048);
 
-            stmr.updater(new IncrementingUpdater());
+            stmr.receiver(new IncrementingUpdater());
 
             for (int i = 0; i < CNT; i++)
                 stmr.addData(RAND.nextInt(RANGE), 1L);
@@ -150,7 +151,7 @@ public class CachePopularNumbersExample {
     /**
      * Increments value for key.
      */
-    private static class IncrementingUpdater implements 
IgniteDataStreamer.Updater<Integer, Long> {
+    private static class IncrementingUpdater implements 
StreamReceiver<Integer, Long> {
         /** Process entries to increase value by entry key. */
         private static final EntryProcessor<Integer, Long, ?> INC = new 
EntryProcessor<Integer, Long, Object>() {
             @Override public Object process(MutableEntry<Integer, Long> e, 
Object... args) {
@@ -163,7 +164,7 @@ public class CachePopularNumbersExample {
         };
 
         /** {@inheritDoc} */
-        @Override public void update(IgniteCache<Integer, Long> cache, 
Collection<Map.Entry<Integer, Long>> entries) {
+        @Override public void receive(IgniteCache<Integer, Long> cache, 
Collection<Map.Entry<Integer, Long>> entries) {
             for (Map.Entry<Integer, Long> entry : entries)
                 cache.invoke(entry.getKey(), INC);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
index 847debe..29cc2c1 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamingPopularNumbersExample.java
@@ -21,12 +21,14 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.stream.*;
 
 import javax.cache.*;
 import javax.cache.configuration.*;
 import javax.cache.expiry.*;
 import javax.cache.processor.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 import static java.util.concurrent.TimeUnit.*;
 
@@ -40,30 +42,20 @@ import static java.util.concurrent.TimeUnit.*;
  * start node with {@code examples/config/example-compute.xml} configuration.
  */
 public class StreamingPopularNumbersExample {
-    /**
-     * Cache name.
-     */
+    /** Cache name. */
     private static final String STREAM_NAME = 
StreamingPopularNumbersExample.class.getSimpleName();
 
-    /**
-     * Count of most popular numbers to retrieve from cluster.
-     */
-    private static final int POPULAR_NUMBERS_CNT = 10;
-
-    /**
-     * Random number generator.
-     */
+    /** Random number generator. */
     private static final Random RAND = new Random();
 
-    /**
-     * Range within which to generate numbers.
-     */
+    /** Range within which to generate numbers. */
     private static final int RANGE = 1000;
 
-    /**
-     * Count of total numbers to generate.
-     */
-    private static final int CNT = 1000000;
+    /** Test duration. */
+    private static final long DURATION = 2 * 60 * 1000;
+
+    /** Flag indicating that the test is finished. */
+    private static volatile boolean finished = false;
 
     /**
      * Executes example.
@@ -99,36 +91,19 @@ public class StreamingPopularNumbersExample {
             try (IgniteCache<Integer, Long> stmCache = 
ignite.createCache(cfg)) {
                 // Check that that server nodes have been started.
                 if 
(ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) {
-                    System.out.println("Ignite does not have streaming cache 
configured: " + STREAM_NAME);
+                    System.err.println("Data nodes not found (start data nodes 
with ExampleNodeStartup class)");
 
                     return;
                 }
 
-                // Stream random numbers from another thread.
-                Thread th = new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(STREAM_NAME)) {
-                            stmr.allowOverwrite(true); // Allow data updates.
-                            stmr.updater(new IncrementingUpdater());
-
-                            while (!Thread.currentThread().isInterrupted())
-                                stmr.addData(RAND.nextInt(RANGE), 1L);
-                        }
-                    }
-                });
-
-                th.start();
-
-                // Run this example for 2 minutes.
-                long duration = 2 * 60 * 1000;
+                ExecutorService exe = startStreaming(ignite);
 
                 long start = System.currentTimeMillis();
 
-                while (System.currentTimeMillis() - start < duration) {
+                while (System.currentTimeMillis() - start < DURATION) {
                     // Select top 10 words.
                     SqlFieldsQuery top10 = new SqlFieldsQuery(
-                        "select _key, _val from Long order by _val desc, _key 
limit ?").setArgs(10);
+                        "select _key, _val from Long order by _val desc limit 
10");
 
                     List<List<?>> results = 
stmCache.queryFields(top10).getAll();
 
@@ -140,8 +115,9 @@ public class StreamingPopularNumbersExample {
                     Thread.sleep(5000);
                 }
 
-                th.interrupt();
-                th.join();
+                finished = true;
+
+                exe.shutdown();
             }
             catch (CacheException e) {
                 e.printStackTrace();
@@ -157,38 +133,35 @@ public class StreamingPopularNumbersExample {
      * Populates cache in real time with numbers and keeps count for every 
number.
      *
      * @param ignite Ignite.
-     * @throws org.apache.ignite.IgniteException If failed.
      */
-    private static void streamData(final Ignite ignite) throws IgniteException 
{
-        try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(STREAM_NAME)) {
-            stmr.allowOverwrite(true); // Allow data updates.
-            stmr.updater(new IncrementingUpdater());
+    private static ExecutorService startStreaming(final Ignite ignite) {
+        ExecutorService exe = Executors.newSingleThreadExecutor();
 
-            while (!Thread.currentThread().isInterrupted())
-                stmr.addData(RAND.nextInt(RANGE), 1L);
-        }
-        catch (IgniteInterruptedException ignore) {}
-    }
+        // Stream random numbers from another thread.
+        exe.submit(new Runnable() {
+            @Override public void run() {
+                try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(STREAM_NAME)) {
+                    // Allow data updates.
+                    stmr.allowOverwrite(true);
 
-    /**
-     * Increments value for key.
-     */
-    private static class IncrementingUpdater implements 
IgniteDataStreamer.Updater<Integer, Long> {
-        @Override
-        public void update(IgniteCache<Integer, Long> cache, 
Collection<Map.Entry<Integer, Long>> entries) {
-            for (Map.Entry<Integer, Long> entry : entries) {
-                // Increment values by 1.
-                cache.invoke(entry.getKey(), new EntryProcessor<Integer, Long, 
Object>() {
-                    @Override
-                    public Object process(MutableEntry<Integer, Long> e, 
Object... args) {
-                        Long val = e.getValue();
-
-                        e.setValue(val == null ? 1L : val + 1);
-
-                        return null;
-                    }
-                });
+                    // Transform data when processing.
+                    stmr.receiver(new StreamTransformer<>(new 
EntryProcessor<Integer, Long, Object>() {
+                        @Override
+                        public Object process(MutableEntry<Integer, Long> e, 
Object... args) {
+                            Long val = e.getValue();
+
+                            e.setValue(val == null ? 1L : val + 1);
+
+                            return null;
+                        }
+                    }));
+
+                    while (!finished)
+                        stmr.addData(RAND.nextInt(RANGE), 1L);
+                }
             }
-        }
+        });
+
+        return exe;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/ComputeExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/ComputeExample.java 
b/examples/src/main/java8/org/apache/ignite/examples/ComputeExample.java
deleted file mode 100644
index f161777..0000000
--- a/examples/src/main/java8/org/apache/ignite/examples/ComputeExample.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-
-/**
- * Demonstrates broadcasting and unicasting computations within cluster.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
- */
-public class ComputeExample {
-    /**
-     * Executes example.
-     *
-     * @param args Command line arguments, none required.
-     * @throws IgniteException If example execution failed.
-     */
-    public static void main(String[] args) throws IgniteException {
-        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
-            System.out.println();
-            System.out.println(">>> Compute broadcast example started.");
-
-            // Broadcast closure to all cluster nodes.
-            ignite.compute().broadcast(() -> System.out.println("Hello 
World"));
-
-            // Unicast closure to some cluster node picked by load balancer.
-            ignite.compute().run(() -> System.out.println("Hello World"));
-
-            // Unicast closure to some cluster node picked by load balancer 
and return result.
-            int length = ignite.compute().call("Hello World"::length);
-
-            System.out.println();
-            System.out.println(">>> Computed length: " + length);
-
-            System.out.println();
-            System.out.println(">>> Check all nodes for hello message 
output.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/MessagingExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/MessagingExample.java 
b/examples/src/main/java8/org/apache/ignite/examples/MessagingExample.java
deleted file mode 100644
index 97b46a5..0000000
--- a/examples/src/main/java8/org/apache/ignite/examples/MessagingExample.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-
-import java.util.concurrent.*;
-
-/**
- * Example that demonstrates how to exchange messages between nodes. Use such
- * functionality for cases when you need to communicate to other nodes outside
- * of ignite task.
- * <p>
- * To run this example you must have at least one remote node started.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
- * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will start node
- * with {@code examples/config/example-compute.xml} configuration.
- */
-public final class MessagingExample {
-    /** Number of messages. */
-    private static final int MESSAGES_NUM = 10;
-
-    /** Message topics. */
-    private enum TOPIC { ORDERED, UNORDERED }
-
-    /**
-     * Executes example.
-     *
-     * @param args Command line arguments, none required.
-     * @throws IgniteException If example execution failed.
-     */
-    public static void main(String[] args) throws Exception {
-        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
-            if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2)) {
-                System.out.println();
-                System.out.println(">>> Please start at least 2 cluster nodes 
to run example.");
-                System.out.println();
-
-                return;
-            }
-
-            System.out.println();
-            System.out.println(">>> Messaging example started.");
-
-            // Group for remote nodes.
-            ClusterGroup rmtGrp = ignite.cluster().forRemotes();
-
-            // Listen for messages from remote nodes to make sure that they 
received all the messages.
-            int msgCnt = rmtGrp.nodes().size() * MESSAGES_NUM;
-
-            CountDownLatch orderedLatch = new CountDownLatch(msgCnt);
-            CountDownLatch unorderedLatch = new CountDownLatch(msgCnt);
-
-            localListen(ignite.message(ignite.cluster().forLocal()), 
orderedLatch, unorderedLatch);
-
-            // Register listeners on all cluster nodes.
-            startListening(ignite, ignite.message(rmtGrp));
-
-            // Send unordered messages to all remote nodes.
-            for (int i = 0; i < MESSAGES_NUM; i++)
-                ignite.message(rmtGrp).send(TOPIC.UNORDERED, 
Integer.toString(i));
-
-            System.out.println(">>> Finished sending unordered messages.");
-
-            // Send ordered messages to all remote nodes.
-            for (int i = 0; i < MESSAGES_NUM; i++)
-                ignite.message(rmtGrp).sendOrdered(TOPIC.ORDERED, 
Integer.toString(i), 0);
-
-            System.out.println(">>> Finished sending ordered messages.");
-            System.out.println(">>> Check output on all nodes for message 
printouts.");
-            System.out.println(">>> Will wait for messages acknowledgements 
from all remote nodes.");
-
-            orderedLatch.await();
-            unorderedLatch.await();
-
-            System.out.println(">>> Messaging example finished.");
-        }
-    }
-
-    /**
-     * Start listening to messages on remote cluster nodes.
-     *
-     * @param ignite Ignite.
-     * @param imsg Ignite messaging.
-     * @throws IgniteException If failed.
-     */
-    private static void startListening(final Ignite ignite, IgniteMessaging 
imsg) throws IgniteException {
-        // Add ordered message listener.
-        imsg.remoteListen(TOPIC.ORDERED, (nodeId, msg) -> {
-            System.out.println("Received ordered message [msg=" + msg + ", 
fromNodeId=" + nodeId + ']');
-
-            try {
-                
ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg);
-            }
-            catch (IgniteException e) {
-                e.printStackTrace();
-            }
-
-            return true; // Return true to continue listening.
-        });
-
-        // Add unordered message listener.
-        imsg.remoteListen(TOPIC.UNORDERED, (nodeId, msg) -> {
-            System.out.println("Received unordered message [msg=" + msg + ", 
fromNodeId=" + nodeId + ']');
-
-            try {
-                
ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.UNORDERED, msg);
-            }
-            catch (IgniteException e) {
-                e.printStackTrace();
-            }
-
-            return true; // Return true to continue listening.
-        });
-    }
-
-    /**
-     * Listen for messages from remote nodes.
-     *
-     * @param imsg Ignite messaging.
-     * @param orderedLatch Latch for ordered messages acks.
-     * @param unorderedLatch Latch for unordered messages acks.
-     */
-    private static void localListen(
-        IgniteMessaging imsg,
-        final CountDownLatch orderedLatch,
-        final CountDownLatch unorderedLatch
-    ) {
-        imsg.localListen(TOPIC.ORDERED, (nodeId, msg) -> {
-            orderedLatch.countDown();
-
-            // Return true to continue listening, false to stop.
-            return orderedLatch.getCount() > 0;
-        });
-
-        imsg.localListen(TOPIC.UNORDERED, (nodeId, msg) -> {
-            unorderedLatch.countDown();
-
-            // Return true to continue listening, false to stop.
-            return unorderedLatch.getCount() > 0;
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/ComputeExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/ComputeExample.java 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ComputeExample.java
new file mode 100644
index 0000000..557d65c
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ComputeExample.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8;
+
+import org.apache.ignite.*;
+
+/**
+ * Demonstrates broadcasting and unicasting computations within cluster.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
+ */
+public class ComputeExample {
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws IgniteException {
+        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
+            System.out.println();
+            System.out.println(">>> Compute broadcast example started.");
+
+            // Broadcast closure to all cluster nodes.
+            ignite.compute().broadcast(() -> System.out.println("Hello 
World"));
+
+            // Unicast closure to some cluster node picked by load balancer.
+            ignite.compute().run(() -> System.out.println("Hello World"));
+
+            // Unicast closure to some cluster node picked by load balancer 
and return result.
+            int length = ignite.compute().call("Hello World"::length);
+
+            System.out.println();
+            System.out.println(">>> Computed length: " + length);
+
+            System.out.println();
+            System.out.println(">>> Check all nodes for hello message 
output.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/ExampleNodeStartup.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/ExampleNodeStartup.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExampleNodeStartup.java
new file mode 100644
index 0000000..6f5ba93
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExampleNodeStartup.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8;
+
+import org.apache.ignite.*;
+
+/**
+ * Starts up an empty node with example compute configuration.
+ */
+public class ExampleNodeStartup {
+    /**
+     * Start up an empty node with example compute configuration.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If failed.
+     */
+    public static void main(String[] args) throws IgniteException {
+        Ignition.start("examples/config/example-compute.xml");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
new file mode 100644
index 0000000..5b62bd4
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/ExamplesUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.streamer.*;
+
+import java.net.*;
+
+/**
+ *
+ */
+public class ExamplesUtils {
+    /** */
+    private static final ClassLoader CLS_LDR = 
ExamplesUtils.class.getClassLoader();
+
+    /**
+     * Exits with code {@code -1} if maximum memory is below 90% of minimally 
allowed threshold.
+     *
+     * @param min Minimum memory threshold.
+     */
+    public static void checkMinMemory(long min) {
+        long maxMem = Runtime.getRuntime().maxMemory();
+
+        if (maxMem < .85 * min) {
+            System.err.println("Heap limit is too low (" + (maxMem / (1024 * 
1024)) +
+                "MB), please increase heap size at least up to " + (min / 
(1024 * 1024)) + "MB.");
+
+            System.exit(-1);
+        }
+    }
+
+    /**
+     * Returns URL resolved by class loader for classes in examples project.
+     *
+     * @return Resolved URL.
+     */
+    public static URL url(String path) {
+        URL url = CLS_LDR.getResource(path);
+
+        if (url == null)
+            throw new RuntimeException("Failed to resolve resource URL by 
path: " + path);
+
+        return url;
+    }
+
+    /**
+     * Checks minimum topology size for running a certain example.
+     *
+     * @param prj Cluster to check size for.
+     * @param size Minimum number of nodes required to run a certain example.
+     * @return {@code True} if check passed, {@code false} otherwise.
+     */
+    public static boolean checkMinTopologySize(ClusterGroup prj, int size) {
+        int prjSize = prj.nodes().size();
+
+        if (prjSize < size) {
+            System.out.println();
+            System.out.println(">>> Please start at least " + size + " cluster 
nodes to run example.");
+            System.out.println();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param name Streamer name.
+     * @return {@code True} if ignite has streamer with given name.
+     */
+    public static boolean hasStreamer(Ignite ignite, String name) {
+        if (ignite.configuration().getStreamerConfiguration() != null) {
+            for (StreamerConfiguration cfg : 
ignite.configuration().getStreamerConfiguration()) {
+                if (name.equals(cfg.getName()))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/MessagingExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/MessagingExample.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/MessagingExample.java
new file mode 100644
index 0000000..621c6d4
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/MessagingExample.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Example that demonstrates how to exchange messages between nodes. Use such
+ * functionality for cases when you need to communicate to other nodes outside
+ * of ignite task.
+ * <p>
+ * To run this example you must have at least one remote node started.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will start node
+ * with {@code examples/config/example-compute.xml} configuration.
+ */
+public final class MessagingExample {
+    /** Number of messages. */
+    private static final int MESSAGES_NUM = 10;
+
+    /** Message topics. */
+    private enum TOPIC { ORDERED, UNORDERED }
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
+            if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2)) {
+                System.out.println();
+                System.out.println(">>> Please start at least 2 cluster nodes 
to run example.");
+                System.out.println();
+
+                return;
+            }
+
+            System.out.println();
+            System.out.println(">>> Messaging example started.");
+
+            // Group for remote nodes.
+            ClusterGroup rmtGrp = ignite.cluster().forRemotes();
+
+            // Listen for messages from remote nodes to make sure that they 
received all the messages.
+            int msgCnt = rmtGrp.nodes().size() * MESSAGES_NUM;
+
+            CountDownLatch orderedLatch = new CountDownLatch(msgCnt);
+            CountDownLatch unorderedLatch = new CountDownLatch(msgCnt);
+
+            localListen(ignite.message(ignite.cluster().forLocal()), 
orderedLatch, unorderedLatch);
+
+            // Register listeners on all cluster nodes.
+            startListening(ignite, ignite.message(rmtGrp));
+
+            // Send unordered messages to all remote nodes.
+            for (int i = 0; i < MESSAGES_NUM; i++)
+                ignite.message(rmtGrp).send(TOPIC.UNORDERED, 
Integer.toString(i));
+
+            System.out.println(">>> Finished sending unordered messages.");
+
+            // Send ordered messages to all remote nodes.
+            for (int i = 0; i < MESSAGES_NUM; i++)
+                ignite.message(rmtGrp).sendOrdered(TOPIC.ORDERED, 
Integer.toString(i), 0);
+
+            System.out.println(">>> Finished sending ordered messages.");
+            System.out.println(">>> Check output on all nodes for message 
printouts.");
+            System.out.println(">>> Will wait for messages acknowledgements 
from all remote nodes.");
+
+            orderedLatch.await();
+            unorderedLatch.await();
+
+            System.out.println(">>> Messaging example finished.");
+        }
+    }
+
+    /**
+     * Start listening to messages on remote cluster nodes.
+     *
+     * @param ignite Ignite.
+     * @param imsg Ignite messaging.
+     * @throws IgniteException If failed.
+     */
+    private static void startListening(final Ignite ignite, IgniteMessaging 
imsg) throws IgniteException {
+        // Add ordered message listener.
+        imsg.remoteListen(TOPIC.ORDERED, (nodeId, msg) -> {
+            System.out.println("Received ordered message [msg=" + msg + ", 
fromNodeId=" + nodeId + ']');
+
+            try {
+                
ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg);
+            }
+            catch (IgniteException e) {
+                e.printStackTrace();
+            }
+
+            return true; // Return true to continue listening.
+        });
+
+        // Add unordered message listener.
+        imsg.remoteListen(TOPIC.UNORDERED, (nodeId, msg) -> {
+            System.out.println("Received unordered message [msg=" + msg + ", 
fromNodeId=" + nodeId + ']');
+
+            try {
+                
ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.UNORDERED, msg);
+            }
+            catch (IgniteException e) {
+                e.printStackTrace();
+            }
+
+            return true; // Return true to continue listening.
+        });
+    }
+
+    /**
+     * Listen for messages from remote nodes.
+     *
+     * @param imsg Ignite messaging.
+     * @param orderedLatch Latch for ordered messages acks.
+     * @param unorderedLatch Latch for unordered messages acks.
+     */
+    private static void localListen(
+        IgniteMessaging imsg,
+        final CountDownLatch orderedLatch,
+        final CountDownLatch unorderedLatch
+    ) {
+        imsg.localListen(TOPIC.ORDERED, (nodeId, msg) -> {
+            orderedLatch.countDown();
+
+            // Return true to continue listening, false to stop.
+            return orderedLatch.getCount() > 0;
+        });
+
+        imsg.localListen(TOPIC.UNORDERED, (nodeId, msg) -> {
+            unorderedLatch.countDown();
+
+            // Return true to continue listening, false to stop.
+            return unorderedLatch.getCount() > 0;
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java
new file mode 100644
index 0000000..3b33402
--- /dev/null
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/StreamingPopularNumbersExample.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.examples.java8.*;
+import org.apache.ignite.stream.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * Real time popular numbers counter.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-compute.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
+ * start node with {@code examples/config/example-compute.xml} configuration.
+ */
+public class StreamingPopularNumbersExample {
+    /** Cache name. */
+    private static final String STREAM_NAME = 
StreamingPopularNumbersExample.class.getSimpleName();
+
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    /** Test duration. */
+    private static final long DURATION = 2 * 60 * 1000;
+
+    /** Flag indicating that the test is finished. */
+    private static volatile boolean finished = false;
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-compute.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache popular numbers example started.");
+
+            /*
+             * Configure streaming cache.
+             * =========================
+             */
+            CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>();
+
+            cfg.setCacheMode(CacheMode.PARTITIONED);
+            cfg.setName(STREAM_NAME);
+            cfg.setIndexedTypes(Integer.class, Long.class);
+
+            // Sliding window of 1 seconds.
+            cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(new Duration(SECONDS, 1))));
+
+            /**
+             * Start the streaming cache on all server nodes.
+             * ============================================
+             */
+            try (IgniteCache<Integer, Long> stmCache = 
ignite.createCache(cfg)) {
+                // Check that that server nodes have been started.
+                if 
(ignite.cluster().forDataNodes(STREAM_NAME).nodes().isEmpty()) {
+                    System.err.println("Data nodes not found (start data nodes 
with ExampleNodeStartup class)");
+
+                    return;
+                }
+
+                ExecutorService exe = startStreaming(ignite);
+
+                long start = System.currentTimeMillis();
+
+                while (System.currentTimeMillis() - start < DURATION) {
+                    // Select top 10 words.
+                    SqlFieldsQuery top10 = new SqlFieldsQuery(
+                        "select _key, _val from Long order by _val desc limit 
10");
+
+                    List<List<?>> results = 
stmCache.queryFields(top10).getAll();
+
+                    for (List<?> res : results)
+                        System.out.println(res.get(0) + "=" + res.get(1));
+
+                    System.out.println("----------------");
+
+                    Thread.sleep(5000);
+                }
+
+                finished = true;
+
+                exe.shutdown();
+            }
+            catch (CacheException e) {
+                e.printStackTrace();
+
+                System.out.println("Destroying cache for name '" + STREAM_NAME 
+ "'. Please try again.");
+
+                ignite.destroyCache(STREAM_NAME);
+            }
+        }
+    }
+
+    /**
+     * Populates the streaming cache in real time with numbers and keeps count 
for every number.
+     *
+     * @param ignite Ignite.
+     */
+    private static ExecutorService startStreaming(final Ignite ignite) {
+        ExecutorService exe = Executors.newSingleThreadExecutor();
+
+        // Stream random numbers from another thread.
+        exe.submit(() -> {
+            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(STREAM_NAME)) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
+
+                // Configure data transformation to count instances of the 
same word.
+                stmr.receiver(new StreamTransformer<>((e, args) -> {
+                    Long val = e.getValue();
+
+                    e.setValue(val == null ? 1L : val + 1);
+
+                    return null;
+                }));
+
+
+                while (!finished)
+                    stmr.addData(RAND.nextInt(RANGE), 1L);
+            }
+        });
+
+        return exe;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
index 53033a0..5ff03de 100644
--- 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
+++ 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
@@ -18,7 +18,7 @@
 package org.apache.ignite.scalar.examples
 
 import org.apache.ignite.IgniteCache
-import org.apache.ignite.examples.ExampleNodeStartup
+import org.apache.ignite.examples.java8.ExampleNodeStartup
 import org.apache.ignite.scalar.scalar
 import org.apache.ignite.scalar.scalar._
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
index 4c73a8a..f6e47a1 100644
--- 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
+++ 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
@@ -19,7 +19,7 @@ package org.apache.ignite.scalar.examples
 
 import org.apache.ignite.events.Event
 import org.apache.ignite.events.EventType._
-import org.apache.ignite.examples.ExampleNodeStartup
+import org.apache.ignite.examples.java8.ExampleNodeStartup
 import org.apache.ignite.lang.IgnitePredicate
 import org.apache.ignite.scalar.scalar
 import org.apache.ignite.scalar.scalar._

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
index f00dec7..0acdf42 100644
--- 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
+++ 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
@@ -17,18 +17,19 @@
 
 package org.apache.ignite.scalar.examples
 
-import org.apache.ignite.cache.query.SqlFieldsQuery
-import org.apache.ignite.examples.ExampleNodeStartup
-import org.apache.ignite.internal.util.scala.impl
-import org.apache.ignite.scalar.scalar
-import org.apache.ignite.scalar.scalar._
-import org.apache.ignite.{IgniteCache, IgniteDataStreamer, IgniteException}
-
-import javax.cache.processor.{EntryProcessor, MutableEntry}
 import java.lang.{Integer => JavaInt, Long => JavaLong}
 import java.util
 import java.util.Map.Entry
 import java.util.Timer
+import javax.cache.processor.{EntryProcessor, MutableEntry}
+
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.examples.java8.ExampleNodeStartup
+import org.apache.ignite.internal.util.scala.impl
+import org.apache.ignite.scalar.scalar
+import org.apache.ignite.scalar.scalar._
+import org.apache.ignite.stream.StreamReceiver
+import org.apache.ignite.{IgniteCache, IgniteException}
 
 import scala.collection.JavaConversions._
 import scala.util.Random
@@ -107,7 +108,7 @@ object ScalarCachePopularNumbersExample extends App {
         // Reduce parallel operations since we running the whole ignite 
cluster locally under heavy load.
         val smtr = dataStreamer$[JavaInt, JavaLong](NAME, 2048)
 
-        smtr.updater(new IncrementingUpdater())
+        smtr.receiver(new IncrementingUpdater())
 
         (0 until CNT) foreach (_ => smtr.addData(RAND.nextInt(RANGE), 1L))
 
@@ -132,7 +133,7 @@ object ScalarCachePopularNumbersExample extends App {
     /**
      * Increments value for key.
      */
-    private class IncrementingUpdater extends 
IgniteDataStreamer.Updater[JavaInt, JavaLong] {
+    private class IncrementingUpdater extends StreamReceiver[JavaInt, 
JavaLong] {
         private[this] final val INC = new EntryProcessor[JavaInt, JavaLong, 
Object]() {
             /** Process entries to increase value by entry key. */
             override def process(e: MutableEntry[JavaInt, JavaLong], args: 
AnyRef*): Object = {
@@ -144,7 +145,7 @@ object ScalarCachePopularNumbersExample extends App {
             }
         }
 
-        @impl def update(cache: IgniteCache[JavaInt, JavaLong], entries: 
util.Collection[Entry[JavaInt, JavaLong]]) {
+        @impl def receive(cache: IgniteCache[JavaInt, JavaLong], entries: 
util.Collection[Entry[JavaInt, JavaLong]]) {
             entries.foreach(entry => cache.invoke(entry.getKey, INC))
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
index f5c13e9..27ac9ed 100644
--- 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
+++ 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
@@ -17,18 +17,17 @@
 
 package org.apache.ignite.scalar.examples
 
-import org.apache.ignite.configuration.CacheConfiguration
-
 import java.util._
 
-import org.apache.ignite.{IgniteCache, Ignite}
-import org.apache.ignite.examples.ExampleNodeStartup
 import org.apache.ignite.cache.CacheMode._
 import org.apache.ignite.cache.affinity.CacheAffinityKey
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.examples.java8.ExampleNodeStartup
 import org.apache.ignite.scalar.scalar
 import org.apache.ignite.scalar.scalar._
+import org.apache.ignite.{Ignite, IgniteCache}
 
-import collection.JavaConversions._
+import scala.collection.JavaConversions._
 
 /**
  * Demonstrates cache ad-hoc queries with Scalar.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
index 75087ba..8a0ae5c 100644
--- 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
+++ 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
@@ -17,17 +17,17 @@
 
 package org.apache.ignite.scalar.examples
 
+import java.lang.{Integer => JavaInt}
+import java.util.ConcurrentModificationException
+import javax.cache.Cache
+
 import org.apache.ignite.IgniteCache
-import org.apache.ignite.examples.ExampleNodeStartup
 import org.apache.ignite.cache.CacheMode
+import org.apache.ignite.examples.java8.ExampleNodeStartup
 import org.apache.ignite.scalar.scalar
 import org.apache.ignite.scalar.scalar._
 import org.jdk8.backport.ThreadLocalRandom8
 
-import javax.cache.Cache
-import java.lang.{Integer => JavaInt}
-import java.util.ConcurrentModificationException
-
 import scala.collection.JavaConversions._
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java
----------------------------------------------------------------------
diff --git 
a/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java
 
b/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java
index e85ac2f..0932b47 100644
--- 
a/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java
+++ 
b/examples/src/test/java/org/apache/ignite/examples/MessagingExamplesSelfTest.java
@@ -46,7 +46,7 @@ public class MessagingExamplesSelfTest extends 
GridAbstractExamplesTest {
     /**
      * @throws Exception If failed.
      */
-    public void testessagingPingPongListenActorExample() throws Exception {
+    public void testMessagingPingPongListenActorExample() throws Exception {
         MessagingPingPongListenActorExample.main(EMPTY_ARGS);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index c6f28bf..ddfd7a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -18,9 +18,9 @@
 package org.apache.ignite;
 
 import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
 import org.jetbrains.annotations.*;
 
-import java.io.*;
 import java.util.*;
 
 /**
@@ -113,7 +113,7 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
     /**
      * Sets flag indicating that this data streamer should assume
      * that there are no other concurrent updates to the cache.
-     * Should not be used when custom cache updater set using {@link 
#updater(IgniteDataStreamer.Updater)} method.
+     * Should not be used when custom cache receiver set using {@link 
#updater(IgniteDataStreamer.Updater)} method.
      * Default is {@code false}. When this flag is set, updates will not be 
propagated to the cache store.
      *
      * @param allowOverwrite Flag value.
@@ -156,14 +156,14 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
     public void perNodeBufferSize(int bufSize);
 
     /**
-     * Gets maximum number of parallel update operations for a single node.
+     * Gets maximum number of parallel stream operations for a single node.
      *
      * @return Maximum number of parallel stream operations for a single node.
      */
     public int perNodeParallelOperations();
 
     /**
-     * Sets maximum number of parallel update operations for a single node.
+     * Sets maximum number of parallel stream operations for a single node.
      * <p>
      * This method should be called prior to {@link #addData(Object, Object)} 
call.
      * <p>
@@ -226,11 +226,11 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
     public void deployClass(Class<?> depCls);
 
     /**
-     * Sets custom cache updater to this data streamer.
+     * Sets custom stream receiver to this data streamer.
      *
-     * @param updater Cache updater.
+     * @param updater Stream receiver.
      */
-    public void updater(Updater<K, V> updater);
+    public void receiver(StreamReceiver<K, V> updater);
 
     /**
      * Adds key for removal on remote node. Equivalent to {@link 
#addData(Object, Object) addData(key, null)}.
@@ -380,22 +380,4 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
      */
     @Override public void close() throws IgniteException, 
IgniteInterruptedException;
 
-    /**
-     * Updates cache with batch of entries. Usually it is enough to configure 
{@link IgniteDataStreamer#allowOverwrite(boolean)}
-     * property and appropriate internal cache updater will be chosen 
automatically. But in some cases to achieve best
-     * performance custom user-defined implementation may help.
-     * <p>
-     * Data streamer can be configured to use custom implementation of updater 
instead of default one using
-     * {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method.
-     */
-    interface Updater<K, V> extends Serializable {
-        /**
-         * Updates cache with batch of entries.
-         *
-         * @param cache Cache.
-         * @param entries Collection of entries.
-         * @throws IgniteException If failed.
-         */
-        public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, 
V>> entries) throws IgniteException;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 4e4ce6a..9c26382 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.compute.*;
@@ -3827,7 +3826,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
             DataStreamerImpl ldr = 
ctx.kernalContext().dataStream().dataStreamer(ctx.namex());
 
             try {
-                ldr.updater(new IgniteDrDataStreamerCacheUpdater());
+                ldr.receiver(new IgniteDrDataStreamerCacheUpdater());
 
                 LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, 
plc);
 
@@ -4021,7 +4020,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
             DataStreamerImpl ldr = 
ctx.kernalContext().dataStream().dataStreamer(ctx.namex());
 
             try {
-                ldr.updater(new IgniteDrDataStreamerCacheUpdater());
+                ldr.receiver(new IgniteDrDataStreamerCacheUpdater());
 
                 LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 
plc0);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 2172d7a..132e584 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -288,7 +288,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                          (DataStreamerImpl)ignite.dataStreamer(cacheName)) {
                     ((DataStreamerImpl)dataLdr).maxRemapCount(0);
 
-                    dataLdr.updater(DataStreamerCacheUpdaters.<KeyCacheObject, 
Object>batched());
+                    
dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
 
                     for (GridDhtLocalPartition locPart : 
dht.topology().currentLocalPartitions()) {
                         if (!locPart.isEmpty() && locPart.primary(topVer)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 85624d4..3a2936f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.marshaller.*;
+import org.apache.ignite.stream.*;
 import org.apache.ignite.thread.*;
 import org.jetbrains.annotations.*;
 
@@ -224,7 +225,7 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
                 clsLdr = dep.classLoader();
             }
 
-            IgniteDataStreamer.Updater<K, V> updater;
+            StreamReceiver<K, V> updater;
 
             try {
                 updater = marsh.unmarshal(req.updaterBytes(), clsLdr);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
index 54061b0..7f6bb5b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.datastreamer;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.stream.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -29,13 +30,13 @@ import java.util.*;
  */
 public class DataStreamerCacheUpdaters {
     /** */
-    private static final IgniteDataStreamer.Updater INDIVIDUAL = new 
Individual();
+    private static final StreamReceiver INDIVIDUAL = new Individual();
 
     /** */
-    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
+    private static final StreamReceiver BATCHED = new Batched();
 
     /** */
-    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new 
BatchedSorted();
+    private static final StreamReceiver BATCHED_SORTED = new BatchedSorted();
 
     /**
      * Updates cache using independent {@link 
org.apache.ignite.internal.processors.cache.GridCache#put(Object, Object, 
CacheEntryPredicate[])} and
@@ -44,7 +45,7 @@ public class DataStreamerCacheUpdaters {
      *
      * @return Single updater.
      */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
+    public static <K, V> StreamReceiver<K, V> individual() {
         return INDIVIDUAL;
     }
 
@@ -55,7 +56,7 @@ public class DataStreamerCacheUpdaters {
      *
      * @return Batched updater.
      */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
+    public static <K, V> StreamReceiver<K, V> batched() {
         return BATCHED;
     }
 
@@ -66,7 +67,7 @@ public class DataStreamerCacheUpdaters {
      *
      * @return Batched sorted updater.
      */
-    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, 
V> batchedSorted() {
+    public static <K extends Comparable<?>, V> StreamReceiver<K, V> 
batchedSorted() {
         return BATCHED_SORTED;
     }
 
@@ -93,12 +94,12 @@ public class DataStreamerCacheUpdaters {
     /**
      * Simple cache updater implementation. Updates keys one by one thus is 
not dead lock prone.
      */
-    private static class Individual<K, V> implements 
IgniteDataStreamer.Updater<K, V>, InternalUpdater {
+    private static class Individual<K, V> implements StreamReceiver<K, V>, 
InternalUpdater {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
+        @Override public void receive(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
             assert cache != null;
             assert !F.isEmpty(entries);
 
@@ -120,12 +121,12 @@ public class DataStreamerCacheUpdaters {
     /**
      * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
      */
-    private static class Batched<K, V> implements 
IgniteDataStreamer.Updater<K, V>, InternalUpdater {
+    private static class Batched<K, V> implements StreamReceiver<K, V>, 
InternalUpdater {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
+        @Override public void receive(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
             assert cache != null;
             assert !F.isEmpty(entries);
 
@@ -160,12 +161,12 @@ public class DataStreamerCacheUpdaters {
     /**
      * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
      */
-    private static class BatchedSorted<K, V> implements 
IgniteDataStreamer.Updater<K, V>, InternalUpdater {
+    private static class BatchedSorted<K, V> implements StreamReceiver<K, V>, 
InternalUpdater {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
+        @Override public void receive(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
             assert cache != null;
             assert !F.isEmpty(entries);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index dde98c6..05e7b44 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
@@ -55,11 +56,11 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
  */
 @SuppressWarnings("unchecked")
 public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, 
Delayed {
-    /** Isolated updater. */
-    private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+    /** Isolated receiver. */
+    private static final StreamReceiver ISOLATED_UPDATER = new 
IsolatedUpdater();
 
-    /** Cache updater. */
-    private Updater<K, V> updater = ISOLATED_UPDATER;
+    /** Cache receiver. */
+    private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
 
     /** */
     private byte[] updaterBytes;
@@ -286,15 +287,15 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     }
 
     /** {@inheritDoc} */
-    @Override public void updater(Updater<K, V> updater) {
-        A.notNull(updater, "updater");
+    @Override public void receiver(StreamReceiver<K, V> rcvr) {
+        A.notNull(rcvr, "rcvr");
 
-        this.updater = updater;
+        this.rcvr = rcvr;
     }
 
     /** {@inheritDoc} */
     @Override public boolean allowOverwrite() {
-        return updater != ISOLATED_UPDATER;
+        return rcvr != ISOLATED_UPDATER;
     }
 
     /** {@inheritDoc} */
@@ -307,7 +308,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         if (node == null)
             throw new IgniteException("Failed to get node for cache: " + 
cacheName);
 
-        updater = allow ? DataStreamerCacheUpdaters.<K, V>individual() : 
ISOLATED_UPDATER;
+        rcvr = allow ? DataStreamerCacheUpdaters.<K, V>individual() : 
ISOLATED_UPDATER;
     }
 
     /** {@inheritDoc} */
@@ -537,7 +538,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                 if (initPda) {
                     jobPda = new DataStreamerPda(key.value(cacheObjCtx, false),
                         entry.getValue() != null ? 
entry.getValue().value(cacheObjCtx, false) : null,
-                        updater);
+                        rcvr);
 
                     initPda = false;
                 }
@@ -1068,7 +1069,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
             if (isLocNode) {
                 fut = ctx.closure().callLocalSafe(
-                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, 
false, skipStore, updater), false);
+                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, 
false, skipStore, rcvr), false);
 
                 locFuts.add(fut);
 
@@ -1099,9 +1100,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                     }
 
                     if (updaterBytes == null) {
-                        assert updater != null;
+                        assert rcvr != null;
 
-                        updaterBytes = 
ctx.config().getMarshaller().marshal(updater);
+                        updaterBytes = 
ctx.config().getMarshaller().marshal(rcvr);
                     }
 
                     if (topicBytes == null)
@@ -1348,15 +1349,15 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
-     * Isolated updater which only loads entry initial value.
+     * Isolated receiver which only loads entry initial value.
      */
-    private static class IsolatedUpdater implements Updater<KeyCacheObject, 
CacheObject>,
+    private static class IsolatedUpdater implements 
StreamReceiver<KeyCacheObject, CacheObject>,
         DataStreamerCacheUpdaters.InternalUpdater {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** {@inheritDoc} */
-        @Override public void update(IgniteCache<KeyCacheObject, CacheObject> 
cache,
+        @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> 
cache,
             Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
             IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = 
(IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index 26f4965..a216ffe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -88,7 +88,7 @@ public class DataStreamerRequest implements Message {
      * @param reqId Request ID.
      * @param resTopicBytes Response topic.
      * @param cacheName Cache name.
-     * @param updaterBytes Cache updater.
+     * @param updaterBytes Cache receiver.
      * @param entries Entries to put.
      * @param ignoreDepOwnership Ignore ownership.
      * @param skipStore Skip store flag.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
index 1faf22d..c984558 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.stream.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -49,7 +50,7 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
     private final boolean skipStore;
 
     /** */
-    private final IgniteDataStreamer.Updater updater;
+    private final StreamReceiver rcvr;
 
     /**
      * @param ctx Context.
@@ -58,7 +59,7 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
      * @param col Entries to put.
      * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
      * @param skipStore Skip store flag.
-     * @param updater Updater.
+     * @param rcvr Updater.
      */
     DataStreamerUpdateJob(
         GridKernalContext ctx,
@@ -67,18 +68,18 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
         Collection<DataStreamerEntry> col,
         boolean ignoreDepOwnership,
         boolean skipStore,
-        IgniteDataStreamer.Updater<?, ?> updater) {
+        StreamReceiver<?, ?> rcvr) {
         this.ctx = ctx;
         this.log = log;
 
         assert col != null && !col.isEmpty();
-        assert updater != null;
+        assert rcvr != null;
 
         this.cacheName = cacheName;
         this.col = col;
         this.ignoreDepOwnership = ignoreDepOwnership;
         this.skipStore = skipStore;
-        this.updater = updater;
+        this.rcvr = rcvr;
     }
 
     /** {@inheritDoc} */
@@ -125,10 +126,10 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
                     }
                 });
 
-                updater.update(cache, col0);
+                rcvr.receive(cache, col0);
             }
             else
-                updater.update(cache, col);
+                rcvr.receive(cache, col);
 
             return null;
         }
@@ -145,6 +146,6 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
      * @return {@code True} if need to unwrap internal entries.
      */
     private boolean unwrapEntries() {
-        return !(updater instanceof DataStreamerCacheUpdaters.InternalUpdater);
+        return !(rcvr instanceof DataStreamerCacheUpdaters.InternalUpdater);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
index af8b088..82ad62b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
@@ -26,19 +26,20 @@ import 
org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.stream.*;
 
 import java.util.*;
 
 /**
- * Data center replication cache updater for data streamer.
+ * Data center replication cache receiver for data streamer.
  */
-public class IgniteDrDataStreamerCacheUpdater implements 
IgniteDataStreamer.Updater<KeyCacheObject, CacheObject>,
+public class IgniteDrDataStreamerCacheUpdater implements 
StreamReceiver<KeyCacheObject, CacheObject>,
     DataStreamerCacheUpdaters.InternalUpdater {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** {@inheritDoc} */
-    @Override public void update(IgniteCache<KeyCacheObject, CacheObject> 
cache0,
+    @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> 
cache0,
         Collection<Map.Entry<KeyCacheObject, CacheObject>> col) {
         try {
             String cacheName = 
cache0.getConfiguration(CacheConfiguration.class).getName();
@@ -66,7 +67,7 @@ public class IgniteDrDataStreamerCacheUpdater implements 
IgniteDataStreamer.Upda
 
                 KeyCacheObject key = entry.getKey();
 
-                // Ensure that updater to not receive special-purpose values 
for TTL and expire time.
+                // Ensure that receiver to not receive special-purpose values 
for TTL and expire time.
                 assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != 
CU.TTL_ZERO && entry.ttl() >= 0;
                 assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && 
entry.expireTime() >= 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index d6db20f..67a4b1a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
@@ -315,7 +314,7 @@ public class IgfsDataManager extends IgfsManager {
         if (cfg.getPerNodeParallelBatchCount() > 0)
             ldr.perNodeParallelOperations(cfg.getPerNodeParallelBatchCount());
 
-        ldr.updater(DataStreamerCacheUpdaters.<IgfsBlockKey, 
byte[]>batchedSorted());
+        ldr.receiver(DataStreamerCacheUpdaters.<IgfsBlockKey, 
byte[]>batchedSorted());
 
         return ldr;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/stream/StreamReceiver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/StreamReceiver.java 
b/modules/core/src/main/java/org/apache/ignite/stream/StreamReceiver.java
new file mode 100644
index 0000000..a3d9dfb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamReceiver.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import org.apache.ignite.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Updates cache with batch of entries. Usually it is enough to configure 
{@link IgniteDataStreamer#allowOverwrite(boolean)}
+ * property and appropriate internal cache receiver will be chosen 
automatically. But in some cases to achieve best
+ * performance custom user-defined implementation may help.
+ * <p>
+ * Data streamer can be configured to use custom implementation of the 
receiver instead of default one using
+ * {@link IgniteDataStreamer#receiver(StreamReceiver)} method.
+ */
+public interface StreamReceiver<K, V> extends Serializable {
+    /**
+     * Updates cache with batch of entries.
+     *
+     * @param cache Cache.
+     * @param entries Collection of entries.
+     * @throws org.apache.ignite.IgniteException If failed.
+     */
+    public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> 
entries) throws IgniteException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java 
b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
new file mode 100644
index 0000000..4ff9a59
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+
+import javax.cache.processor.*;
+import java.util.*;
+
+/**
+ * Created by Dmitriy on 3/18/15.
+ */
+public class StreamTransformer<K, V> implements StreamReceiver<K, V> {
+    private EntryProcessor<K, V, Object> ep;
+
+    public StreamTransformer(CacheEntryProcessor<K, V, Object> ep) {
+        this.ep = ep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receive(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) throws IgniteException {
+        for (Map.Entry<K, V> entry : entries)
+            cache.invoke(entry.getKey(), ep);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/888f0a3d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
index 3a105cc..41c2288 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
@@ -188,7 +188,7 @@ public abstract class 
GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
 
                 cacheAsync.get("key" + rnd);
 
-                assert cacheAsync.future().get() == rnd;
+                assert cacheAsync.<Integer>future().get() == rnd;
 
                 cache.get("wrongKey");
 

Reply via email to