# ignite-42
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b9288ffe Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b9288ffe Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b9288ffe Branch: refs/heads/ignite-32 Commit: b9288ffec904eefc70ca49b8b04f1b5ffcd0a3a0 Parents: 55b5a6d Author: sboikov <sboi...@gridgain.com> Authored: Thu Jan 22 14:22:21 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jan 22 15:59:02 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 16 +- .../processors/cache/GridCacheStoreManager.java | 261 +++++++++++++++++-- .../IgniteCacheLoaderWriterAbstractTest.java | 22 +- .../GridDataLoaderProcessorSelfTest.java | 25 +- 4 files changed, 277 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9288ffe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 7992cd3..99d4cde 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -3519,7 +3519,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im */ private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException { try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { - ldr.updater(new SkipStoreUpdater<K, V>()); + ldr.skipStore(true); final Collection<Map.Entry<K, V>> col = new ArrayList<>(ldr.perNodeBufferSize()); @@ -5338,18 +5338,4 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im ldr.addData(col); } } - - /** - * - */ - static class SkipStoreUpdater<K, V> implements IgniteDataLoadCacheUpdater<K, V> { - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) - throws IgniteCheckedException { - cache = cache.flagsOn(SKIP_STORE); - - for (Map.Entry<K, V> e : entries) - cache.put(e.getKey(), e.getValue()); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9288ffe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java index 60e2425..5bf2404 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java @@ -579,26 +579,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { } else { if (store != null) { - Collection<Cache.Entry<? extends K, ?>> entries; - - if (convertPortable) { - entries = new ArrayList<>(map.size()); - - for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : map.entrySet()) { - K k = e.getKey(); - Object v = locStore ? e.getValue() : e.getValue().get1(); - - entries.add(new CacheEntryImpl<>( - (K)cctx.unwrapPortableIfNeeded(k, false), - (V)cctx.unwrapPortableIfNeeded(v, false))); - } - } - else { - entries = new ArrayList<>(map.size()); - - for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : map.entrySet()) - entries.add(new CacheEntryImpl<>(e.getKey(), locStore ? e.getValue() : e.getValue().get1())); - } + EntriesView entries = new EntriesView(map); if (log.isDebugEnabled()) log.debug("Storing values in cache store [entries=" + entries + ']'); @@ -894,4 +875,244 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { return ses0 != null ? ses0.cacheName() : null; } } + + /** + * + */ + @SuppressWarnings("unchecked") + private class EntriesView extends AbstractCollection<Cache.Entry<? extends K, ?>> { + /** */ + private final Map<K, IgniteBiTuple<V, GridCacheVersion>> map; + + /** */ + private Set<K> rmvd; + + /** */ + private boolean cleared; + + /** + * @param map Map. + */ + private EntriesView(Map<K, IgniteBiTuple<V, GridCacheVersion>> map) { + assert map != null; + + this.map = map; + } + + /** {@inheritDoc} */ + @Override public int size() { + return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 0)); + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return cleared || !iterator().hasNext(); + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + if (cleared || !(o instanceof Cache.Entry)) + return false; + + Cache.Entry<? extends K, ?> e = (Cache.Entry<? extends K, ?>)o; + + return map.containsKey(e.getKey()); + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator<Cache.Entry<? extends K, ?>> iterator() { + if (cleared) + return F.emptyIterator(); + + final Iterator<Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>>> it0 = map.entrySet().iterator(); + + return new Iterator<Cache.Entry<? extends K, ?>>() { + /** */ + private Cache.Entry<? extends K, ?> cur; + + /** */ + private Cache.Entry<? extends K, ?> next; + + /** + * + */ + { + checkNext(); + } + + /** + * + */ + private void checkNext() { + while (it0.hasNext()) { + Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e = it0.next(); + + K k = e.getKey(); + + if (rmvd != null && rmvd.contains(k)) + continue; + + Object v = locStore ? e.getValue() : e.getValue().get1(); + + if (convertPortable) { + k = (K)cctx.unwrapPortableIfNeeded(k, false); + v = cctx.unwrapPortableIfNeeded(v, false); + } + + next = new CacheEntryImpl<>(k, v); + + break; + } + } + + @Override public boolean hasNext() { + return next != null; + } + + @Override public Cache.Entry<? extends K, ?> next() { + if (next == null) + throw new NoSuchElementException(); + + cur = next; + + next = null; + + checkNext(); + + return cur; + } + + @Override public void remove() { + if (cur == null) + throw new IllegalStateException(); + + addRemoved(cur); + + cur = null; + } + }; + } + + /** {@inheritDoc} */ + @Override public boolean add(Cache.Entry<? extends K, ?> entry) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean addAll(Collection<? extends Cache.Entry<? extends K, ?>> col) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + if (cleared || !(o instanceof Cache.Entry)) + return false; + + Cache.Entry<? extends K, ?> e = (Cache.Entry<? extends K, ?>)o; + + if (rmvd != null && rmvd.contains(e.getKey())) + return false; + + if (mapContains(e)) { + addRemoved(e); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean containsAll(Collection<?> col) { + if (cleared) + return false; + + for (Object o : col) { + if (contains(o)) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean removeAll(Collection<?> col) { + if (cleared) + return false; + + boolean modified = false; + + for (Object o : col) { + if (remove(o)) + modified = true; + } + + return modified; + } + + /** {@inheritDoc} */ + @Override public boolean retainAll(Collection<?> col) { + if (cleared) + return false; + + boolean modified = false; + + for (Cache.Entry<? extends K, ?> e : this) { + if (!col.contains(e)) { + addRemoved(e); + + modified = true; + } + } + + return modified; + } + + /** {@inheritDoc} */ + @Override public void clear() { + cleared = true; + } + + /** + * @param e Entry. + */ + private void addRemoved(Cache.Entry<? extends K, ?> e) { + if (rmvd == null) + rmvd = new HashSet<>(); + + rmvd.add(e.getKey()); + } + + /** + * @param e Entry. + * @return {@code True} if original map contains entry. + */ + private boolean mapContains(Cache.Entry<? extends K, ?> e) { + K key = (K)(convertPortable ? cctx.marshalToPortable(e.getKey()) : e.getKey()); + + return map.containsKey(key); + + } + + /** {@inheritDoc} */ + public String toString() { + Iterator<Cache.Entry<? extends K, ?>> it = iterator(); + + if (!it.hasNext()) + return "[]"; + + SB sb = new SB("["); + + while (true) { + Cache.Entry<? extends K, ?> e = it.next(); + + sb.a(e.toString()); + + if (!it.hasNext()) + return sb.a(']').toString(); + + sb.a(", "); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9288ffe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java index 17282f6..64e10c7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java @@ -325,8 +325,19 @@ public abstract class IgniteCacheLoaderWriterAbstractTest extends IgniteCacheAbs writerCallCnt.incrementAndGet(); - for (Cache.Entry<? extends Integer, ? extends Integer> e : entries) + Iterator<Cache.Entry<? extends Integer, ? extends Integer>> it = entries.iterator(); + + while (it.hasNext()) { + Cache.Entry<? extends Integer, ? extends Integer> e = it.next(); + storeMap.put(e.getKey(), e.getValue()); + + it.remove(); + } + + assertTrue(entries.isEmpty()); + + assertEquals(0, entries.size()); } /** {@inheritDoc} */ @@ -352,8 +363,15 @@ public abstract class IgniteCacheLoaderWriterAbstractTest extends IgniteCacheAbs writerCallCnt.incrementAndGet(); - for (Object key : keys) + Iterator<?> it = keys.iterator(); + + while (it.hasNext()) { + Object key = it.next(); + storeMap.remove(key); + + it.remove(); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9288ffe/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java index aeb86e6..9d415c0 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java @@ -19,21 +19,22 @@ package org.gridgain.grid.kernal.processors.dataload; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.eviction.fifo.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.grid.cache.store.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.junits.common.*; import org.jetbrains.annotations.*; +import javax.cache.*; +import javax.cache.configuration.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -78,7 +79,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional"}) + @SuppressWarnings({"IfMayBeConditional", "unchecked"}) @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -105,7 +106,11 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { cc.setEvictSynchronized(false); cc.setEvictNearSynchronized(false); - cc.setStore(store); + if (store != null) { + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + } cfg.setCacheConfiguration(cc); } @@ -780,7 +785,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { storeMap.put(i, i); try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) { - ldr.skipStore(false); + assertFalse(ldr.skipStore()); for (int i = 0; i < 1000; i++) ldr.removeData(i); @@ -860,19 +865,19 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { /** * */ - private class TestStore extends GridCacheStoreAdapter<Object, Object> { + private class TestStore extends CacheStoreAdapter<Object, Object> { /** {@inheritDoc} */ - @Nullable @Override public Object load(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException { + @Nullable @Override public Object load(Object key) { return storeMap.get(key); } /** {@inheritDoc} */ - @Override public void put(@Nullable IgniteTx tx, Object key, Object val) throws IgniteCheckedException { - storeMap.put(key, val); + @Override public void write(Cache.Entry<?, ?> entry) { + storeMap.put(entry.getKey(), entry.getValue()); } /** {@inheritDoc} */ - @Override public void remove(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException { + @Override public void delete(Object key) { storeMap.remove(key); } }