Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-394 e5f686239 -> 9c8217c17


# gg-9869


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/96f426bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/96f426bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/96f426bb

Branch: refs/heads/ignite-394
Commit: 96f426bb0b5b3d19badac0610726f58ca4a0c15e
Parents: e5f6862
Author: Artem Shutak <ashu...@gridgain.com>
Authored: Wed Mar 4 15:24:33 2015 +0300
Committer: Artem Shutak <ashu...@gridgain.com>
Committed: Wed Mar 4 15:24:33 2015 +0300

----------------------------------------------------------------------
 .../ignite/examples/CacheExamplesSelfTest.java  |   2 +-
 .../org/apache/ignite/IgniteDataStreamer.java   |   2 +-
 .../GridDistributedCacheAdapter.java            |   2 +-
 .../dataload/GridDataLoadCacheUpdaters.java     | 199 -------------------
 .../dataload/GridDataLoadUpdateJob.java         | 119 -----------
 .../IgniteDataStreamerCacheUpdaters.java        | 199 +++++++++++++++++++
 .../dataload/IgniteDataStreamerImpl.java        |  10 +-
 .../dataload/IgniteDataStreamerProcessor.java   |   2 +-
 .../dataload/IgniteDataStreamerUpdateJob.java   | 119 +++++++++++
 .../processors/igfs/IgfsDataManager.java        |   2 +-
 .../IgniteDataStreamerImplSelfTest.java         |   4 +-
 .../IgniteDataStreamerPerformanceTest.java      |   2 +-
 .../IgniteDataStreamerProcessorSelfTest.java    |  22 +-
 13 files changed, 342 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git 
a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java 
b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index c5c4599..14af44f 100644
--- 
a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ 
b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -114,7 +114,7 @@ public class CacheExamplesSelfTest extends 
GridAbstractExamplesTest {
     /**
      * @throws Exception If failed.
      */
-    public void testCacheDataLoaderExample() throws Exception {
+    public void testCacheDataStreamerExample() throws Exception {
         CacheDataStreamerExample.main(EMPTY_ARGS);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 22aa0c1..a47c079 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -33,7 +33,7 @@ import java.util.*;
  * data may get to remote nodes in different order from which it was added to
  * the loader.
  * <p>
- * Also note that {@code GridDataLoader} is not the only way to load data into 
cache.
+ * Also note that {@code IgniteDataStreamer} is not the only way to load data 
into cache.
  * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, 
Object...)}
  * method to load data from underlying data store. You can also use standard
  * cache {@code put(...)} and {@code putAll(...)} operations as well, but they 
most

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 5791b8d..16419f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -279,7 +279,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                 try (IgniteDataStreamer<K, V> dataLdr = 
ignite.dataStreamer(cacheName)) {
                     ((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0);
 
-                    dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched());
+                    dataLdr.updater(IgniteDataStreamerCacheUpdaters.<K, 
V>batched());
 
                     for (GridDhtLocalPartition<K, V> locPart : 
dht.topology().currentLocalPartitions()) {
                         if (!locPart.isEmpty() && locPart.primary(topVer)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
deleted file mode 100644
index 78a7e62..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Bundled factory for cache updaters.
- */
-public class GridDataLoadCacheUpdaters {
-    /** */
-    private static final IgniteDataStreamer.Updater INDIVIDUAL = new 
Individual();
-
-    /** */
-    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
-
-    /** */
-    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new 
BatchedSorted();
-
-    /**
-     * Updates cache using independent {@link 
org.apache.ignite.cache.GridCache#put(Object, Object, 
org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#remove(Object, 
org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from 
deadlocks but performance
-     * is not the best.
-     *
-     * @return Single updater.
-     */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
-        return INDIVIDUAL;
-    }
-
-    /**
-     * Updates cache using batched methods {@link 
org.apache.ignite.cache.GridCache#putAll(Map, 
org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, 
org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same 
keys are getting
-     * updated concurrently. Performance is generally better than in {@link 
#individual()}.
-     *
-     * @return Batched updater.
-     */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
-        return BATCHED;
-    }
-
-    /**
-     * Updates cache using batched methods {@link 
org.apache.ignite.cache.GridCache#putAll(Map, 
org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, 
org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order 
and if all updates
-     * use the same rule deadlock can not happen. Performance is generally 
better than in {@link #individual()}.
-     *
-     * @return Batched sorted updater.
-     */
-    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, 
V> batchedSorted() {
-        return BATCHED_SORTED;
-    }
-
-    /**
-     * Updates cache.
-     *
-     * @param cache Cache.
-     * @param rmvCol Keys to remove.
-     * @param putMap Entries to put.
-     * @throws IgniteException If failed.
-     */
-    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable 
Set<K> rmvCol,
-        Map<K, V> putMap) {
-        assert rmvCol != null || putMap != null;
-
-        // Here we assume that there are no key duplicates, so the following 
calls are valid.
-        if (rmvCol != null)
-            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
-
-        if (putMap != null)
-            cache.putAll(putMap);
-    }
-
-    /**
-     * Simple cache updater implementation. Updates keys one by one thus is 
not dead lock prone.
-     */
-    private static class Individual<K, V> implements 
IgniteDataStreamer.Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                V val = entry.getValue();
-
-                if (val == null)
-                    cache.remove(key);
-                else
-                    cache.put(key, val);
-            }
-        }
-    }
-
-    /**
-     * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
-     */
-    private static class Batched<K, V> implements 
IgniteDataStreamer.Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            Map<K, V> putAll = null;
-            Set<K> rmvAll = null;
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                V val = entry.getValue();
-
-                if (val == null) {
-                    if (rmvAll == null)
-                        rmvAll = new HashSet<>();
-
-                    rmvAll.add(key);
-                }
-                else {
-                    if (putAll == null)
-                        putAll = new HashMap<>();
-
-                    putAll.put(key, val);
-                }
-            }
-
-            updateAll(cache, rmvAll, putAll);
-        }
-    }
-
-    /**
-     * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
-     */
-    private static class BatchedSorted<K, V> implements 
IgniteDataStreamer.Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            Map<K, V> putAll = null;
-            Set<K> rmvAll = null;
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key instanceof Comparable;
-
-                V val = entry.getValue();
-
-                if (val == null) {
-                    if (rmvAll == null)
-                        rmvAll = new TreeSet<>();
-
-                    rmvAll.add(key);
-                }
-                else {
-                    if (putAll == null)
-                        putAll = new TreeMap<>();
-
-                    putAll.put(key, val);
-                }
-            }
-
-            updateAll(cache, rmvAll, putAll);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
deleted file mode 100644
index 8aa554a..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Job to put entries to cache on affinity node.
- */
-class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
-    /** */
-    private final GridKernalContext ctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** Cache name. */
-    private final String cacheName;
-
-    /** Entries to put. */
-    private final Collection<Map.Entry<K, V>> col;
-
-    /** {@code True} to ignore deployment ownership. */
-    private final boolean ignoreDepOwnership;
-
-    /** */
-    private final boolean skipStore;
-
-    /** */
-    private final IgniteDataStreamer.Updater<K, V> updater;
-
-    /**
-     * @param ctx Context.
-     * @param log Log.
-     * @param cacheName Cache name.
-     * @param col Entries to put.
-     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
-     * @param updater Updater.
-     */
-    GridDataLoadUpdateJob(
-        GridKernalContext ctx,
-        IgniteLogger log,
-        @Nullable String cacheName,
-        Collection<Map.Entry<K, V>> col,
-        boolean ignoreDepOwnership,
-        boolean skipStore,
-        IgniteDataStreamer.Updater<K, V> updater) {
-        this.ctx = ctx;
-        this.log = log;
-
-        assert col != null && !col.isEmpty();
-        assert updater != null;
-
-        this.cacheName = cacheName;
-        this.col = col;
-        this.ignoreDepOwnership = ignoreDepOwnership;
-        this.skipStore = skipStore;
-        this.updater = updater;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object call() throws Exception {
-        if (log.isDebugEnabled())
-            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", 
size=" + col.size() + ']');
-
-//        TODO IGNITE-77: restore adapter usage.
-//        GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
-//
-//        IgniteFuture<?> f = cache.context().preloader().startFuture();
-//
-//        if (!f.isDone())
-//            f.get();
-//
-//        if (ignoreDepOwnership)
-//            cache.context().deploy().ignoreOwnership(true);
-
-        IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
-
-        if (skipStore)
-            cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
-
-        if (ignoreDepOwnership)
-            cache.context().deploy().ignoreOwnership(true);
-
-        try {
-            updater.update(cache, col);
-
-            return null;
-        }
-        finally {
-            if (ignoreDepOwnership)
-                cache.context().deploy().ignoreOwnership(false);
-
-            if (log.isDebugEnabled())
-                log.debug("Update job finished on node: " + ctx.localNodeId());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
new file mode 100644
index 0000000..1742041
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Bundled factory for cache updaters.
+ */
+public class IgniteDataStreamerCacheUpdaters {
+    /** */
+    private static final IgniteDataStreamer.Updater INDIVIDUAL = new 
Individual();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new 
BatchedSorted();
+
+    /**
+     * Updates cache using independent {@link 
org.apache.ignite.cache.GridCache#put(Object, Object, 
org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#remove(Object, 
org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from 
deadlocks but performance
+     * is not the best.
+     *
+     * @return Single updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
+        return INDIVIDUAL;
+    }
+
+    /**
+     * Updates cache using batched methods {@link 
org.apache.ignite.cache.GridCache#putAll(Map, 
org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, 
org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same 
keys are getting
+     * updated concurrently. Performance is generally better than in {@link 
#individual()}.
+     *
+     * @return Batched updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
+        return BATCHED;
+    }
+
+    /**
+     * Updates cache using batched methods {@link 
org.apache.ignite.cache.GridCache#putAll(Map, 
org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, 
org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order 
and if all updates
+     * use the same rule deadlock can not happen. Performance is generally 
better than in {@link #individual()}.
+     *
+     * @return Batched sorted updater.
+     */
+    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, 
V> batchedSorted() {
+        return BATCHED_SORTED;
+    }
+
+    /**
+     * Updates cache.
+     *
+     * @param cache Cache.
+     * @param rmvCol Keys to remove.
+     * @param putMap Entries to put.
+     * @throws IgniteException If failed.
+     */
+    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable 
Set<K> rmvCol,
+        Map<K, V> putMap) {
+        assert rmvCol != null || putMap != null;
+
+        // Here we assume that there are no key duplicates, so the following 
calls are valid.
+        if (rmvCol != null)
+            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
+
+        if (putMap != null)
+            cache.putAll(putMap);
+    }
+
+    /**
+     * Simple cache updater implementation. Updates keys one by one thus is 
not dead lock prone.
+     */
+    private static class Individual<K, V> implements 
IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null)
+                    cache.remove(key);
+                else
+                    cache.put(key, val);
+            }
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
+     */
+    private static class Batched<K, V> implements 
IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new HashSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new HashMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
+     */
+    private static class BatchedSorted<K, V> implements 
IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key instanceof Comparable;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new TreeSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new TreeMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
index 80d08c8..1231e27 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
@@ -302,7 +302,7 @@ public class IgniteDataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, D
         if (node == null)
             throw new IgniteException("Failed to get node for cache: " + 
cacheName);
 
-        updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : 
ISOLATED_UPDATER;
+        updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : 
ISOLATED_UPDATER;
     }
 
     /** {@inheritDoc} */
@@ -450,7 +450,7 @@ public class IgniteDataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, D
                 assert key != null;
 
                 if (initPda) {
-                    jobPda = new DataLoaderPda(key, entry.getValue(), updater);
+                    jobPda = new DataStreamerPda(key, entry.getValue(), 
updater);
 
                     initPda = false;
                 }
@@ -981,7 +981,7 @@ public class IgniteDataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, D
 
             if (isLocNode) {
                 fut = ctx.closure().callLocalSafe(
-                    new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, 
false, skipStore, updater), false);
+                    new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, 
entries, false, skipStore, updater), false);
 
                 locFuts.add(fut);
 
@@ -1193,7 +1193,7 @@ public class IgniteDataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, D
     /**
      * Data streamer peer-deploy aware.
      */
-    private class DataLoaderPda implements GridPeerDeployAware {
+    private class DataStreamerPda implements GridPeerDeployAware {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1211,7 +1211,7 @@ public class IgniteDataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, D
          *
          * @param objs Collection of objects to detect deploy class and class 
loader.
          */
-        private DataLoaderPda(Object... objs) {
+        private DataStreamerPda(Object... objs) {
             this.objs = Arrays.asList(objs);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
index 69ea440..7db41e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
@@ -248,7 +248,7 @@ public class IgniteDataStreamerProcessor<K, V> extends 
GridProcessorAdapter {
                 return;
             }
 
-            GridDataLoadUpdateJob<K, V> job = new GridDataLoadUpdateJob<>(ctx,
+            IgniteDataStreamerUpdateJob<K, V> job = new 
IgniteDataStreamerUpdateJob<>(ctx,
                 log,
                 req.cacheName(),
                 col,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
new file mode 100644
index 0000000..1a3db40
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Job to put entries to cache on affinity node.
+ */
+class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Entries to put. */
+    private final Collection<Map.Entry<K, V>> col;
+
+    /** {@code True} to ignore deployment ownership. */
+    private final boolean ignoreDepOwnership;
+
+    /** */
+    private final boolean skipStore;
+
+    /** */
+    private final IgniteDataStreamer.Updater<K, V> updater;
+
+    /**
+     * @param ctx Context.
+     * @param log Log.
+     * @param cacheName Cache name.
+     * @param col Entries to put.
+     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
+     * @param updater Updater.
+     */
+    IgniteDataStreamerUpdateJob(
+        GridKernalContext ctx,
+        IgniteLogger log,
+        @Nullable String cacheName,
+        Collection<Map.Entry<K, V>> col,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        IgniteDataStreamer.Updater<K, V> updater) {
+        this.ctx = ctx;
+        this.log = log;
+
+        assert col != null && !col.isEmpty();
+        assert updater != null;
+
+        this.cacheName = cacheName;
+        this.col = col;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object call() throws Exception {
+        if (log.isDebugEnabled())
+            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", 
size=" + col.size() + ']');
+
+//        TODO IGNITE-77: restore adapter usage.
+//        GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
+//
+//        IgniteFuture<?> f = cache.context().preloader().startFuture();
+//
+//        if (!f.isDone())
+//            f.get();
+//
+//        if (ignoreDepOwnership)
+//            cache.context().deploy().ignoreOwnership(true);
+
+        IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
+
+        if (skipStore)
+            cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
+
+        if (ignoreDepOwnership)
+            cache.context().deploy().ignoreOwnership(true);
+
+        try {
+            updater.update(cache, col);
+
+            return null;
+        }
+        finally {
+            if (ignoreDepOwnership)
+                cache.context().deploy().ignoreOwnership(false);
+
+            if (log.isDebugEnabled())
+                log.debug("Update job finished on node: " + ctx.localNodeId());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index d585352..15309bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -314,7 +314,7 @@ public class IgfsDataManager extends IgfsManager {
         if (cfg.getPerNodeParallelBatchCount() > 0)
             
ldr.perNodeParallelLoadOperations(cfg.getPerNodeParallelBatchCount());
 
-        ldr.updater(GridDataLoadCacheUpdaters.<IgfsBlockKey, 
byte[]>batchedSorted());
+        ldr.updater(IgniteDataStreamerCacheUpdaters.<IgfsBlockKey, 
byte[]>batchedSorted());
 
         return ldr;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
index b0d8625..306e615 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
@@ -36,7 +36,7 @@ import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
 
 /**
- * Tests for {@code GridDataLoaderImpl}.
+ * Tests for {@code IgniteDataStreamerImpl}.
  */
 public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
     /** IP finder. */
@@ -69,7 +69,7 @@ public class IgniteDataStreamerImplSelfTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testNullPointerExceptionUponDataLoaderClosing() throws 
Exception {
+    public void testNullPointerExceptionUponDataStreamerClosing() throws 
Exception {
         try {
             startGrids(5);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
index b3dd71b..22a1f97 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
@@ -140,7 +140,7 @@ public class IgniteDataStreamerPerformanceTest extends 
GridCommonAbstractTest {
             final IgniteDataStreamer<Integer, String> ldr = 
ignite.dataStreamer(null);
 
             ldr.perNodeBufferSize(8192);
-            ldr.updater(GridDataLoadCacheUpdaters.<Integer, 
String>batchedSorted());
+            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, 
String>batchedSorted());
             ldr.autoFlushFrequency(0);
 
             final LongAdder cnt = new LongAdder();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
index 8eefebf..23a46e5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
@@ -124,7 +124,7 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
     public void testPartitioned() throws Exception {
         mode = PARTITIONED;
 
-        checkDataLoader();
+        checkDataStreamer();
     }
 
     /**
@@ -134,7 +134,7 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
         mode = PARTITIONED;
         nearEnabled = false;
 
-        checkDataLoader();
+        checkDataStreamer();
     }
 
     /**
@@ -143,7 +143,7 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
     public void testReplicated() throws Exception {
         mode = REPLICATED;
 
-        checkDataLoader();
+        checkDataStreamer();
     }
 
     /**
@@ -153,7 +153,7 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
         mode = LOCAL;
 
         try {
-            checkDataLoader();
+            checkDataStreamer();
 
             assert false;
         }
@@ -167,7 +167,7 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
      * @throws Exception If failed.
      */
     @SuppressWarnings("ErrorNotRethrown")
-    private void checkDataLoader() throws Exception {
+    private void checkDataStreamer() throws Exception {
         try {
             Ignite g1 = startGrid(1);
 
@@ -178,7 +178,7 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
 
             final IgniteDataStreamer<Integer, Integer> ldr = 
g1.dataStreamer(null);
 
-            ldr.updater(GridDataLoadCacheUpdaters.<Integer, 
Integer>batchedSorted());
+            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, 
Integer>batchedSorted());
 
             final AtomicInteger idxGen = new AtomicInteger();
             final int cnt = 400;
@@ -220,7 +220,7 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
 
             final IgniteDataStreamer<Integer, Integer> rmvLdr = 
g2.dataStreamer(null);
 
-            rmvLdr.updater(GridDataLoadCacheUpdaters.<Integer, 
Integer>batchedSorted());
+            rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, 
Integer>batchedSorted());
 
             final CountDownLatch l2 = new CountDownLatch(threads);
 
@@ -265,7 +265,7 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
     public void testPartitionedIsolated() throws Exception {
         mode = PARTITIONED;
 
-        checkIsolatedDataLoader();
+        checkIsolatedDataStreamer();
     }
 
     /**
@@ -274,13 +274,13 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
     public void testReplicatedIsolated() throws Exception {
         mode = REPLICATED;
 
-        checkIsolatedDataLoader();
+        checkIsolatedDataStreamer();
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void checkIsolatedDataLoader() throws Exception {
+    private void checkIsolatedDataStreamer() throws Exception {
         try {
             useCache = true;
 
@@ -418,7 +418,7 @@ public class IgniteDataStreamerProcessorSelfTest extends 
GridCommonAbstractTest
             // Get and configure loader.
             final IgniteDataStreamer<Integer, Integer> ldr = 
g1.dataStreamer(null);
 
-            ldr.updater(GridDataLoadCacheUpdaters.<Integer, 
Integer>individual());
+            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, 
Integer>individual());
             ldr.perNodeBufferSize(2);
 
             // Define count of puts.

Reply via email to