IGNITE-9655-merge - Data loader implementation with allowOverwrite flag.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ff2da209 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ff2da209 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ff2da209 Branch: refs/heads/ignite-9655-merge Commit: ff2da2090d8296884e6a7c7ccfcb17156aaf3d91 Parents: 270246d Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Feb 12 16:26:54 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Feb 12 16:26:54 2015 -0800 ---------------------------------------------------------------------- .../datagrid/CacheDataLoaderExample.java | 1 - .../org/apache/ignite/IgniteDataLoader.java | 22 ++-- .../affinity/GridAffinityProcessor.java | 36 ++++++- .../dataload/IgniteDataLoaderImpl.java | 101 +++++++++++++++---- .../GridDataLoaderProcessorSelfTest.java | 95 ++++++++++++++++- 5 files changed, 218 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java index 64fa179..60b2bc5 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java @@ -67,7 +67,6 @@ public class CacheDataLoaderExample { // Configure loader. ldr.perNodeBufferSize(1024); ldr.perNodeParallelLoadOperations(8); - ldr.isolated(true); for (int i = 0; i < ENTRY_COUNT; i++) { ldr.addData(i, Integer.toString(i)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java index 89fd324..3cff287 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java @@ -70,11 +70,11 @@ import java.util.*; * changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}). * </li> * <li> - * {@link #isolated(boolean)} - defines if data loader will assume that there are no other concurrent + * {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent * updates and allow data loader choose most optimal concurrent implementation. * </li> * <li> - * {@link #updater(org.apache.ignite.IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries. + * {@link #updater(IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries. * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way. * </li> * <li> @@ -103,21 +103,21 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { /** * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache. - * Default is {@code false}. + * Default is {@code true}. * * @return Flag value. */ - public boolean isolated(); + public boolean allowOverwrite(); /** * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache. - * Should not be used when custom cache updater set using {@link #updater(org.apache.ignite.IgniteDataLoader.Updater)} method. - * Default is {@code false}. + * Should not be used when custom cache updater set using {@link #updater(IgniteDataLoader.Updater)} method. + * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store. * - * @param isolated Flag value. + * @param allowOverwrite Flag value. * @throws IgniteException If failed. */ - public void isolated(boolean isolated) throws IgniteException; + public void allowOverwrite(boolean allowOverwrite) throws IgniteException; /** * Gets flag indicating that write-through behavior should be disabled for data loading. @@ -343,7 +343,7 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * * @param cancel {@code True} to cancel ongoing loading operations. * @throws IgniteException If failed to map key to node. - * @throws org.apache.ignite.IgniteInterruptedException If thread has been interrupted. + * @throws IgniteInterruptedException If thread has been interrupted. */ public void close(boolean cancel) throws IgniteException, IgniteInterruptedException; @@ -359,7 +359,7 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { @Override public void close() throws IgniteException, IgniteInterruptedException; /** - * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#isolated(boolean)} + * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#allowOverwrite(boolean)} * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best * performance custom user-defined implementation may help. * <p> @@ -372,7 +372,7 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * * @param cache Cache. * @param entries Collection of entries. - * @throws org.apache.ignite.IgniteException If failed. + * @throws IgniteException If failed. */ public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index cf268fd..d7d0391 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -174,6 +174,29 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** + * Map single key to primary and backup nodes. + * + * @param cacheName Cache name. + * @param key Key to map. + * @return Affinity nodes, primary first. + * @throws IgniteCheckedException If failed. + */ + public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) throws IgniteCheckedException { + A.notNull(key, "key"); + + ClusterNode loc = ctx.discovery().localNode(); + + if (U.hasCache(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL) + return Collections.singletonList(loc); + + long topVer = ctx.discovery().topologyVersion(); + + AffinityInfo affInfo = affinityCache(cacheName, topVer); + + return primaryAndBackups(affInfo, key); + } + + /** * Maps single key to a node on default cache. * * @param key Key to map. @@ -213,7 +236,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @return Cache affinity. */ public <K> CacheAffinityProxy<K> affinityProxy(String cacheName) { - return new CacheAffinityProxy(cacheName); + return new CacheAffinityProxy<>(cacheName); } /** @@ -458,6 +481,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return nodes.iterator().next(); } + /** + * @param aff Affinity function. + * @param key Key to check. + * @return Primary and backup nodes. + */ + private <K> List<ClusterNode> primaryAndBackups(AffinityInfo aff, K key) { + int part = aff.affFunc.partition(aff.mapper.affinityKey(key)); + + return aff.assignment.get(part); + } + /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index c7002f8..5fcaac1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.dataload; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -26,7 +25,11 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.processors.dr.*; import org.apache.ignite.internal.processors.portable.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; @@ -51,9 +54,13 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** * Data loader implementation. */ +@SuppressWarnings("unchecked") public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delayed { + /** Isolated updater. */ + private static final Updater ISOLATED_UPDATER = new IsolatedUpdater(); + /** Cache updater. */ - private Updater<K, V> updater = GridDataLoadCacheUpdaters.individual(); + private Updater<K, V> updater = ISOLATED_UPDATER; /** */ private byte[] updaterBytes; @@ -278,13 +285,13 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } /** {@inheritDoc} */ - @Override public boolean isolated() { - return updater != GridDataLoadCacheUpdaters.individual(); + @Override public boolean allowOverwrite() { + return updater != ISOLATED_UPDATER; } /** {@inheritDoc} */ - @Override public void isolated(boolean isolated) { - if (isolated()) + @Override public void allowOverwrite(boolean allow) { + if (allow == allowOverwrite()) return; ClusterNode node = F.first(ctx.grid().forCacheNodes(cacheName).nodes()); @@ -292,13 +299,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay if (node == null) throw new IgniteException("Failed to get node for cache: " + cacheName); - GridCacheAttributes a = U.cacheAttributes(node, cacheName); - - assert a != null; - - updater = a.atomicityMode() == CacheAtomicityMode.ATOMIC ? - GridDataLoadCacheUpdaters.<K, V>batched() : - GridDataLoadCacheUpdaters.<K, V>groupLocked(); + updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER; } /** {@inheritDoc} */ @@ -444,7 +445,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay boolean initPda = ctx.deploy().enabled() && jobPda == null; for (Map.Entry<K, V> entry : entries) { - ClusterNode node; + List<ClusterNode> nodes; try { K key = entry.getKey(); @@ -457,7 +458,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay initPda = false; } - node = ctx.affinity().mapKeyToNode(cacheName, key); + nodes = nodes(key); } catch (IgniteCheckedException e) { resFut.onDone(e); @@ -465,20 +466,22 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay return; } - if (node == null) { - resFut.onDone(new ClusterTopologyCheckedException("Failed to map key to node " + + if (F.isEmpty(nodes)) { + resFut.onDone(new ClusterTopologyException("Failed to map key to node " + "(no nodes with cache found in topology) [infos=" + entries.size() + ", cacheName=" + cacheName + ']')); return; } - Collection<Map.Entry<K, V>> col = mappings.get(node); + for (ClusterNode node : nodes) { + Collection<Map.Entry<K, V>> col = mappings.get(node); - if (col == null) - mappings.put(node, col = new ArrayList<>()); + if (col == null) + mappings.put(node, col = new ArrayList<>()); - col.add(entry); + col.add(entry); + } } for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) { @@ -552,6 +555,18 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } /** + * @param key Key to map. + * @return Nodes to send requests to. + * @throws IgniteCheckedException If failed. + */ + private List<ClusterNode> nodes(K key) throws IgniteCheckedException { + GridAffinityProcessor aff = ctx.affinity(); + + return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) : + Collections.singletonList(aff.mapKeyToNode(cacheName, key)); + } + + /** * Performs flush. * * @throws IgniteCheckedException If failed. @@ -1365,4 +1380,48 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } } } + + /** + * Isolated updater which only loads entry initial value. + */ + private static class IsolatedUpdater<K, V> implements Updater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache; + + GridCacheAdapter<K, V> internalCache = proxy.context().cache(); + + if (internalCache.isNear()) + internalCache = internalCache.context().near().dht(); + + GridCacheContext<K, V> cctx = internalCache.context(); + + long topVer = cctx.affinity().affinityTopologyVersion(); + + GridCacheVersion ver = cctx.versions().next(topVer); + + for (Map.Entry<K, V> e : entries) { + try { + GridCacheEntryEx<K, V> entry = internalCache.entryEx(e.getKey(), topVer); + + entry.unswap(true, false); + + entry.initialValue(e.getValue(), null, ver, 0, 0, false, topVer, GridDrType.DR_LOAD); + + cctx.evicts().touch(entry, topVer); + } + catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { + // No-op. + } + catch (IgniteCheckedException ex) { + IgniteLogger log = cache.unwrap(Ignite.class).log(); + + U.error(log, "Failed to set initial value for cache entry: " + e, ex); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff2da209/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java index 20a316d..6a4bb68 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java @@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.dataload; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cache.GridCache; +import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.spi.discovery.tcp.*; @@ -100,8 +104,6 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setEvictionPolicy(new CacheFifoEvictionPolicy(10000)); - cc.setEvictSynchronized(false); cc.setEvictNearSynchronized(false); @@ -283,6 +285,88 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testPartitionedIsolated() throws Exception { + mode = PARTITIONED; + + checkIsolatedDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedIsolated() throws Exception { + mode = REPLICATED; + + checkIsolatedDataLoader(); + } + + /** + * @throws Exception If failed. + */ + private void checkIsolatedDataLoader() throws Exception { + try { + useCache = true; + + Ignite g1 = startGrid(0); + startGrid(1); + startGrid(2); + + awaitPartitionMapExchange(); + + GridCache<Integer, Integer> cache = grid(0).cache(null); + + for (int i = 0; i < 100; i++) + cache.put(i, -1); + + final int cnt = 40_000; + final int threads = 10; + + try (final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null)) { + final AtomicInteger idxGen = new AtomicInteger(); + + IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + ldr.addData(idx, idx); + } + + return null; + } + }, threads); + + f1.get(); + } + + for (int g = 0; g < 3; g++) { + ClusterNode locNode = grid(g).localNode(); + + GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null); + + if (cache0.isNear()) + cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht(); + + CacheAffinity<Integer> aff = cache0.affinity(); + + for (int key = 0; key < cnt * threads; key++) { + if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) { + GridCacheEntryEx<Integer, Integer> entry = cache0.peekEx(key); + + assertNotNull("Missing entry for key: " + key, entry); + assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false)); + } + } + } + } + finally { + stopAllGrids(); + } + } + + /** * Test primitive arrays can be passed into data loader. * * @throws Exception If failed. @@ -741,6 +825,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { ldr.perNodeBufferSize(10); ldr.autoFlushFrequency(3000); + ldr.allowOverwrite(true); for (int i = 0; i < 9; i++) ldr.addData(i, i); @@ -782,6 +867,8 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { storeMap.put(i, i); try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) { + ldr.allowOverwrite(true); + assertFalse(ldr.skipStore()); for (int i = 0; i < 1000; i++) @@ -798,6 +885,8 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { assertEquals(i, storeMap.get(i)); try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) { + ldr.allowOverwrite(true); + ldr.skipStore(true); for (int i = 0; i < 1000; i++)