IGNITE-9655-merge - Data loader implementation with allowOverwrite flag.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ff2da209
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ff2da209
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ff2da209

Branch: refs/heads/ignite-9655-merge
Commit: ff2da2090d8296884e6a7c7ccfcb17156aaf3d91
Parents: 270246d
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Thu Feb 12 16:26:54 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Thu Feb 12 16:26:54 2015 -0800

----------------------------------------------------------------------
 .../datagrid/CacheDataLoaderExample.java        |   1 -
 .../org/apache/ignite/IgniteDataLoader.java     |  22 ++--
 .../affinity/GridAffinityProcessor.java         |  36 ++++++-
 .../dataload/IgniteDataLoaderImpl.java          | 101 +++++++++++++++----
 .../GridDataLoaderProcessorSelfTest.java        |  95 ++++++++++++++++-
 5 files changed, 218 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
index 64fa179..60b2bc5 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
@@ -67,7 +67,6 @@ public class CacheDataLoaderExample {
                 // Configure loader.
                 ldr.perNodeBufferSize(1024);
                 ldr.perNodeParallelLoadOperations(8);
-                ldr.isolated(true);
 
                 for (int i = 0; i < ENTRY_COUNT; i++) {
                     ldr.addData(i, Integer.toString(i));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
index 89fd324..3cff287 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
@@ -70,11 +70,11 @@ import java.util.*;
  *      changing), but it won't be lost anyway. Disabled by default (default 
value is {@code 0}).
  *  </li>
  *  <li>
- *      {@link #isolated(boolean)} - defines if data loader will assume that 
there are no other concurrent
+ *      {@link #allowOverwrite(boolean)} - defines if data loader will assume 
that there are no other concurrent
  *      updates and allow data loader choose most optimal concurrent 
implementation.
  *  </li>
  *  <li>
- *      {@link #updater(org.apache.ignite.IgniteDataLoader.Updater)} - defines 
how cache will be updated with loaded entries.
+ *      {@link #updater(IgniteDataLoader.Updater)} - defines how cache will be 
updated with loaded entries.
  *      It allows to provide user-defined custom logic to update the cache in 
the most effective and flexible way.
  *  </li>
  *  <li>
@@ -103,21 +103,21 @@ public interface IgniteDataLoader<K, V> extends 
AutoCloseable {
 
     /**
      * Gets flag value indicating that this data loader assumes that there are 
no other concurrent updates to the cache.
-     * Default is {@code false}.
+     * Default is {@code true}.
      *
      * @return Flag value.
      */
-    public boolean isolated();
+    public boolean allowOverwrite();
 
     /**
      * Sets flag indicating that this data loader should assume that there are 
no other concurrent updates to the cache.
-     * Should not be used when custom cache updater set using {@link 
#updater(org.apache.ignite.IgniteDataLoader.Updater)} method.
-     * Default is {@code false}.
+     * Should not be used when custom cache updater set using {@link 
#updater(IgniteDataLoader.Updater)} method.
+     * Default is {@code true}. When this flag is set, updates will not be 
propagated to the cache store.
      *
-     * @param isolated Flag value.
+     * @param allowOverwrite Flag value.
      * @throws IgniteException If failed.
      */
-    public void isolated(boolean isolated) throws IgniteException;
+    public void allowOverwrite(boolean allowOverwrite) throws IgniteException;
 
     /**
      * Gets flag indicating that write-through behavior should be disabled for 
data loading.
@@ -343,7 +343,7 @@ public interface IgniteDataLoader<K, V> extends 
AutoCloseable {
      *
      * @param cancel {@code True} to cancel ongoing loading operations.
      * @throws IgniteException If failed to map key to node.
-     * @throws org.apache.ignite.IgniteInterruptedException If thread has been 
interrupted.
+     * @throws IgniteInterruptedException If thread has been interrupted.
      */
     public void close(boolean cancel) throws IgniteException, 
IgniteInterruptedException;
 
@@ -359,7 +359,7 @@ public interface IgniteDataLoader<K, V> extends 
AutoCloseable {
     @Override public void close() throws IgniteException, 
IgniteInterruptedException;
 
     /**
-     * Updates cache with batch of entries. Usually it is enough to configure 
{@link IgniteDataLoader#isolated(boolean)}
+     * Updates cache with batch of entries. Usually it is enough to configure 
{@link IgniteDataLoader#allowOverwrite(boolean)}
      * property and appropriate internal cache updater will be chosen 
automatically. But in some cases to achieve best
      * performance custom user-defined implementation may help.
      * <p>
@@ -372,7 +372,7 @@ public interface IgniteDataLoader<K, V> extends 
AutoCloseable {
          *
          * @param cache Cache.
          * @param entries Collection of entries.
-         * @throws org.apache.ignite.IgniteException If failed.
+         * @throws IgniteException If failed.
          */
         public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, 
V>> entries) throws IgniteException;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index cf268fd..d7d0391 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -174,6 +174,29 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Map single key to primary and backup nodes.
+     *
+     * @param cacheName Cache name.
+     * @param key Key to map.
+     * @return Affinity nodes, primary first.
+     * @throws IgniteCheckedException If failed.
+     */
+    public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String 
cacheName, K key) throws IgniteCheckedException {
+        A.notNull(key, "key");
+
+        ClusterNode loc = ctx.discovery().localNode();
+
+        if (U.hasCache(loc, cacheName) && 
ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL)
+            return Collections.singletonList(loc);
+
+        long topVer = ctx.discovery().topologyVersion();
+
+        AffinityInfo affInfo = affinityCache(cacheName, topVer);
+
+        return primaryAndBackups(affInfo, key);
+    }
+
+    /**
      * Maps single key to a node on default cache.
      *
      * @param key Key to map.
@@ -213,7 +236,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
      * @return Cache affinity.
      */
     public <K> CacheAffinityProxy<K> affinityProxy(String cacheName) {
-        return new CacheAffinityProxy(cacheName);
+        return new CacheAffinityProxy<>(cacheName);
     }
 
     /**
@@ -458,6 +481,17 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
         return nodes.iterator().next();
     }
 
+    /**
+     * @param aff Affinity function.
+     * @param key Key to check.
+     * @return Primary and backup nodes.
+     */
+    private <K> List<ClusterNode> primaryAndBackups(AffinityInfo aff, K key) {
+        int part = aff.affFunc.partition(aff.mapper.affinityKey(key));
+
+        return aff.assignment.get(part);
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>>");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
index c7002f8..5fcaac1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.dataload;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
@@ -26,7 +25,11 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.dr.*;
 import org.apache.ignite.internal.processors.portable.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
@@ -51,9 +54,13 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 /**
  * Data loader implementation.
  */
+@SuppressWarnings("unchecked")
 public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, 
Delayed {
+    /** Isolated updater. */
+    private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+
     /** Cache updater. */
-    private Updater<K, V> updater = GridDataLoadCacheUpdaters.individual();
+    private Updater<K, V> updater = ISOLATED_UPDATER;
 
     /** */
     private byte[] updaterBytes;
@@ -278,13 +285,13 @@ public class IgniteDataLoaderImpl<K, V> implements 
IgniteDataLoader<K, V>, Delay
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isolated() {
-        return updater != GridDataLoadCacheUpdaters.individual();
+    @Override public boolean allowOverwrite() {
+        return updater != ISOLATED_UPDATER;
     }
 
     /** {@inheritDoc} */
-    @Override public void isolated(boolean isolated) {
-        if (isolated())
+    @Override public void allowOverwrite(boolean allow) {
+        if (allow == allowOverwrite())
             return;
 
         ClusterNode node = 
F.first(ctx.grid().forCacheNodes(cacheName).nodes());
@@ -292,13 +299,7 @@ public class IgniteDataLoaderImpl<K, V> implements 
IgniteDataLoader<K, V>, Delay
         if (node == null)
             throw new IgniteException("Failed to get node for cache: " + 
cacheName);
 
-        GridCacheAttributes a = U.cacheAttributes(node, cacheName);
-
-        assert a != null;
-
-        updater = a.atomicityMode() == CacheAtomicityMode.ATOMIC ?
-            GridDataLoadCacheUpdaters.<K, V>batched() :
-            GridDataLoadCacheUpdaters.<K, V>groupLocked();
+        updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : 
ISOLATED_UPDATER;
     }
 
     /** {@inheritDoc} */
@@ -444,7 +445,7 @@ public class IgniteDataLoaderImpl<K, V> implements 
IgniteDataLoader<K, V>, Delay
         boolean initPda = ctx.deploy().enabled() && jobPda == null;
 
         for (Map.Entry<K, V> entry : entries) {
-            ClusterNode node;
+            List<ClusterNode> nodes;
 
             try {
                 K key = entry.getKey();
@@ -457,7 +458,7 @@ public class IgniteDataLoaderImpl<K, V> implements 
IgniteDataLoader<K, V>, Delay
                     initPda = false;
                 }
 
-                node = ctx.affinity().mapKeyToNode(cacheName, key);
+                nodes = nodes(key);
             }
             catch (IgniteCheckedException e) {
                 resFut.onDone(e);
@@ -465,20 +466,22 @@ public class IgniteDataLoaderImpl<K, V> implements 
IgniteDataLoader<K, V>, Delay
                 return;
             }
 
-            if (node == null) {
-                resFut.onDone(new ClusterTopologyCheckedException("Failed to 
map key to node " +
+            if (F.isEmpty(nodes)) {
+                resFut.onDone(new ClusterTopologyException("Failed to map key 
to node " +
                     "(no nodes with cache found in topology) [infos=" + 
entries.size() +
                     ", cacheName=" + cacheName + ']'));
 
                 return;
             }
 
-            Collection<Map.Entry<K, V>> col = mappings.get(node);
+            for (ClusterNode node : nodes) {
+                Collection<Map.Entry<K, V>> col = mappings.get(node);
 
-            if (col == null)
-                mappings.put(node, col = new ArrayList<>());
+                if (col == null)
+                    mappings.put(node, col = new ArrayList<>());
 
-            col.add(entry);
+                col.add(entry);
+            }
         }
 
         for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : 
mappings.entrySet()) {
@@ -552,6 +555,18 @@ public class IgniteDataLoaderImpl<K, V> implements 
IgniteDataLoader<K, V>, Delay
     }
 
     /**
+     * @param key Key to map.
+     * @return Nodes to send requests to.
+     * @throws IgniteCheckedException If failed.
+     */
+    private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
+        GridAffinityProcessor aff = ctx.affinity();
+
+        return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, 
key) :
+            Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+    }
+
+    /**
      * Performs flush.
      *
      * @throws IgniteCheckedException If failed.
@@ -1365,4 +1380,48 @@ public class IgniteDataLoaderImpl<K, V> implements 
IgniteDataLoader<K, V>, Delay
             }
         }
     }
+
+    /**
+     * Isolated updater which only loads entry initial value.
+     */
+    private static class IsolatedUpdater<K, V> implements Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) {
+            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
+
+            GridCacheAdapter<K, V> internalCache = proxy.context().cache();
+
+            if (internalCache.isNear())
+                internalCache = internalCache.context().near().dht();
+
+            GridCacheContext<K, V> cctx = internalCache.context();
+
+            long topVer = cctx.affinity().affinityTopologyVersion();
+
+            GridCacheVersion ver = cctx.versions().next(topVer);
+
+            for (Map.Entry<K, V> e : entries) {
+                try {
+                    GridCacheEntryEx<K, V> entry = 
internalCache.entryEx(e.getKey(), topVer);
+
+                    entry.unswap(true, false);
+
+                    entry.initialValue(e.getValue(), null, ver, 0, 0, false, 
topVer, GridDrType.DR_LOAD);
+
+                    cctx.evicts().touch(entry, topVer);
+                }
+                catch (GridDhtInvalidPartitionException | 
GridCacheEntryRemovedException ignored) {
+                    // No-op.
+                }
+                catch (IgniteCheckedException ex) {
+                    IgniteLogger log = cache.unwrap(Ignite.class).log();
+
+                    U.error(log, "Failed to set initial value for cache entry: 
" + e, ex);
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
index 20a316d..6a4bb68 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
@@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.dataload;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.eviction.fifo.*;
+import org.apache.ignite.cache.GridCache;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.spi.discovery.tcp.*;
@@ -100,8 +104,6 @@ public class GridDataLoaderProcessorSelfTest extends 
GridCommonAbstractTest {
             cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : 
PARTITIONED_ONLY);
             cc.setWriteSynchronizationMode(FULL_SYNC);
 
-            cc.setEvictionPolicy(new CacheFifoEvictionPolicy(10000));
-
             cc.setEvictSynchronized(false);
             cc.setEvictNearSynchronized(false);
 
@@ -283,6 +285,88 @@ public class GridDataLoaderProcessorSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedIsolated() throws Exception {
+        mode = PARTITIONED;
+
+        checkIsolatedDataLoader();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedIsolated() throws Exception {
+        mode = REPLICATED;
+
+        checkIsolatedDataLoader();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkIsolatedDataLoader() throws Exception {
+        try {
+            useCache = true;
+
+            Ignite g1 = startGrid(0);
+            startGrid(1);
+            startGrid(2);
+
+            awaitPartitionMapExchange();
+
+            GridCache<Integer, Integer> cache = grid(0).cache(null);
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, -1);
+
+            final int cnt = 40_000;
+            final int threads = 10;
+
+            try (final IgniteDataLoader<Integer, Integer> ldr = 
g1.dataLoader(null)) {
+                final AtomicInteger idxGen = new AtomicInteger();
+
+                IgniteInternalFuture<?> f1 = multithreadedAsync(new 
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        for (int i = 0; i < cnt; i++) {
+                            int idx = idxGen.getAndIncrement();
+
+                            ldr.addData(idx, idx);
+                        }
+
+                        return null;
+                    }
+                }, threads);
+
+                f1.get();
+            }
+
+            for (int g = 0; g < 3; g++) {
+                ClusterNode locNode = grid(g).localNode();
+
+                GridCacheAdapter<Integer, Integer> cache0 = 
((IgniteKernal)grid(g)).internalCache(null);
+
+                if (cache0.isNear())
+                    cache0 = ((GridNearCacheAdapter<Integer, 
Integer>)cache0).dht();
+
+                CacheAffinity<Integer> aff = cache0.affinity();
+
+                for (int key = 0; key < cnt * threads; key++) {
+                    if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, 
key)) {
+                        GridCacheEntryEx<Integer, Integer> entry = 
cache0.peekEx(key);
+
+                        assertNotNull("Missing entry for key: " + key, entry);
+                        assertEquals((Integer)(key < 100 ? -1 : key), 
entry.rawGetOrUnmarshal(false));
+                    }
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * Test primitive arrays can be passed into data loader.
      *
      * @throws Exception If failed.
@@ -741,6 +825,7 @@ public class GridDataLoaderProcessorSelfTest extends 
GridCommonAbstractTest {
 
             ldr.perNodeBufferSize(10);
             ldr.autoFlushFrequency(3000);
+            ldr.allowOverwrite(true);
 
             for (int i = 0; i < 9; i++)
                 ldr.addData(i, i);
@@ -782,6 +867,8 @@ public class GridDataLoaderProcessorSelfTest extends 
GridCommonAbstractTest {
                 storeMap.put(i, i);
 
             try (IgniteDataLoader<Object, Object> ldr = 
ignite.dataLoader(null)) {
+                ldr.allowOverwrite(true);
+
                 assertFalse(ldr.skipStore());
 
                 for (int i = 0; i < 1000; i++)
@@ -798,6 +885,8 @@ public class GridDataLoaderProcessorSelfTest extends 
GridCommonAbstractTest {
                 assertEquals(i, storeMap.get(i));
 
             try (IgniteDataLoader<Object, Object> ldr = 
ignite.dataLoader(null)) {
+                ldr.allowOverwrite(true);
+
                 ldr.skipStore(true);
 
                 for (int i = 0; i < 1000; i++)

Reply via email to