# ignite-42

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b9288ffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b9288ffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b9288ffe

Branch: refs/heads/ignite-32
Commit: b9288ffec904eefc70ca49b8b04f1b5ffcd0a3a0
Parents: 55b5a6d
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Jan 22 14:22:21 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Jan 22 15:59:02 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  16 +-
 .../processors/cache/GridCacheStoreManager.java | 261 +++++++++++++++++--
 .../IgniteCacheLoaderWriterAbstractTest.java    |  22 +-
 .../GridDataLoaderProcessorSelfTest.java        |  25 +-
 4 files changed, 277 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9288ffe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 7992cd3..99d4cde 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -3519,7 +3519,7 @@ public abstract class GridCacheAdapter<K, V> extends 
GridMetadataAwareAdapter im
      */
     private void localLoadAndUpdate(final Collection<? extends K> keys) throws 
IgniteCheckedException {
         try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, 
V>dataLoad().dataLoader(ctx.namex(), false)) {
-            ldr.updater(new SkipStoreUpdater<K, V>());
+            ldr.skipStore(true);
 
             final Collection<Map.Entry<K, V>> col = new 
ArrayList<>(ldr.perNodeBufferSize());
 
@@ -5338,18 +5338,4 @@ public abstract class GridCacheAdapter<K, V> extends 
GridMetadataAwareAdapter im
                 ldr.addData(col);
         }
     }
-
-    /**
-     *
-     */
-    static class SkipStoreUpdater<K, V> implements 
IgniteDataLoadCacheUpdater<K, V> {
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries)
-            throws IgniteCheckedException {
-            cache = cache.flagsOn(SKIP_STORE);
-
-            for (Map.Entry<K, V> e : entries)
-                cache.put(e.getKey(), e.getValue());
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9288ffe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
index 60e2425..5bf2404 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
@@ -579,26 +579,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
         }
         else {
             if (store != null) {
-                Collection<Cache.Entry<? extends K, ?>> entries;
-
-                if (convertPortable) {
-                    entries = new ArrayList<>(map.size());
-
-                    for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : 
map.entrySet()) {
-                        K k = e.getKey();
-                        Object v = locStore ? e.getValue() : 
e.getValue().get1();
-
-                        entries.add(new CacheEntryImpl<>(
-                            (K)cctx.unwrapPortableIfNeeded(k, false),
-                            (V)cctx.unwrapPortableIfNeeded(v, false)));
-                    }
-                }
-                else {
-                    entries = new ArrayList<>(map.size());
-
-                    for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : 
map.entrySet())
-                        entries.add(new CacheEntryImpl<>(e.getKey(), locStore 
? e.getValue() : e.getValue().get1()));
-                }
+                EntriesView entries = new EntriesView(map);
 
                 if (log.isDebugEnabled())
                     log.debug("Storing values in cache store [entries=" + 
entries + ']');
@@ -894,4 +875,244 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
             return ses0 != null ? ses0.cacheName() : null;
         }
     }
+
+    /**
+     *
+     */
+    @SuppressWarnings("unchecked")
+    private class EntriesView extends AbstractCollection<Cache.Entry<? extends 
K, ?>> {
+        /** */
+        private final Map<K, IgniteBiTuple<V, GridCacheVersion>> map;
+
+        /** */
+        private Set<K> rmvd;
+
+        /** */
+        private boolean cleared;
+
+        /**
+         * @param map Map.
+         */
+        private EntriesView(Map<K, IgniteBiTuple<V, GridCacheVersion>> map) {
+            assert map != null;
+
+            this.map = map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 
0));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isEmpty() {
+            return cleared || !iterator().hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean contains(Object o) {
+            if (cleared || !(o instanceof Cache.Entry))
+                return false;
+
+            Cache.Entry<? extends K, ?> e = (Cache.Entry<? extends K, ?>)o;
+
+            return map.containsKey(e.getKey());
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator<Cache.Entry<? extends K, ?>> 
iterator() {
+            if (cleared)
+                return F.emptyIterator();
+
+            final Iterator<Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>>> 
it0 = map.entrySet().iterator();
+
+            return new Iterator<Cache.Entry<? extends K, ?>>() {
+                /** */
+                private Cache.Entry<? extends K, ?> cur;
+
+                /** */
+                private Cache.Entry<? extends K, ?> next;
+
+                /**
+                 *
+                 */
+                {
+                    checkNext();
+                }
+
+                /**
+                 *
+                 */
+                private void checkNext() {
+                    while (it0.hasNext()) {
+                        Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e = 
it0.next();
+
+                        K k = e.getKey();
+
+                        if (rmvd != null && rmvd.contains(k))
+                            continue;
+
+                        Object v = locStore ? e.getValue() : 
e.getValue().get1();
+
+                        if (convertPortable) {
+                            k = (K)cctx.unwrapPortableIfNeeded(k, false);
+                            v = cctx.unwrapPortableIfNeeded(v, false);
+                        }
+
+                        next = new CacheEntryImpl<>(k, v);
+
+                        break;
+                    }
+                }
+
+                @Override public boolean hasNext() {
+                    return next != null;
+                }
+
+                @Override public Cache.Entry<? extends K, ?> next() {
+                    if (next == null)
+                        throw new NoSuchElementException();
+
+                    cur = next;
+
+                    next = null;
+
+                    checkNext();
+
+                    return cur;
+                }
+
+                @Override public void remove() {
+                    if (cur == null)
+                        throw new IllegalStateException();
+
+                    addRemoved(cur);
+
+                    cur = null;
+                }
+            };
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean add(Cache.Entry<? extends K, ?> entry) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean addAll(Collection<? extends Cache.Entry<? 
extends K, ?>> col) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean remove(Object o) {
+            if (cleared || !(o instanceof Cache.Entry))
+                return false;
+
+            Cache.Entry<? extends K, ?> e = (Cache.Entry<? extends K, ?>)o;
+
+            if (rmvd != null && rmvd.contains(e.getKey()))
+                return false;
+
+            if (mapContains(e)) {
+                addRemoved(e);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean containsAll(Collection<?> col) {
+            if (cleared)
+                return false;
+
+            for (Object o : col) {
+                if (contains(o))
+                    return false;
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean removeAll(Collection<?> col) {
+            if (cleared)
+                return false;
+
+            boolean modified = false;
+
+            for (Object o : col) {
+                 if (remove(o))
+                     modified = true;
+            }
+
+            return modified;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean retainAll(Collection<?> col) {
+            if (cleared)
+                return false;
+
+            boolean modified = false;
+
+            for (Cache.Entry<? extends K, ?> e : this) {
+                if (!col.contains(e)) {
+                    addRemoved(e);
+
+                    modified = true;
+                }
+            }
+
+            return modified;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void clear() {
+            cleared = true;
+        }
+
+        /**
+         * @param e Entry.
+         */
+        private void addRemoved(Cache.Entry<? extends K, ?> e) {
+            if (rmvd == null)
+                rmvd = new HashSet<>();
+
+            rmvd.add(e.getKey());
+        }
+
+        /**
+         * @param e Entry.
+         * @return {@code True} if original map contains entry.
+         */
+        private boolean mapContains(Cache.Entry<? extends K, ?> e) {
+            K key = (K)(convertPortable ? cctx.marshalToPortable(e.getKey()) : 
e.getKey());
+
+            return map.containsKey(key);
+
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            Iterator<Cache.Entry<? extends K, ?>> it = iterator();
+
+            if (!it.hasNext())
+                return "[]";
+
+            SB sb = new SB("[");
+
+            while (true) {
+                Cache.Entry<? extends K, ?> e = it.next();
+
+                sb.a(e.toString());
+
+                if (!it.hasNext())
+                    return sb.a(']').toString();
+
+                sb.a(", ");
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9288ffe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
index 17282f6..64e10c7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
@@ -325,8 +325,19 @@ public abstract class IgniteCacheLoaderWriterAbstractTest 
extends IgniteCacheAbs
 
             writerCallCnt.incrementAndGet();
 
-            for (Cache.Entry<? extends Integer, ? extends Integer> e : entries)
+            Iterator<Cache.Entry<? extends Integer, ? extends Integer>> it = 
entries.iterator();
+
+            while (it.hasNext()) {
+                Cache.Entry<? extends Integer, ? extends Integer> e = 
it.next();
+
                 storeMap.put(e.getKey(), e.getValue());
+
+                it.remove();
+            }
+
+            assertTrue(entries.isEmpty());
+
+            assertEquals(0, entries.size());
         }
 
         /** {@inheritDoc} */
@@ -352,8 +363,15 @@ public abstract class IgniteCacheLoaderWriterAbstractTest 
extends IgniteCacheAbs
 
             writerCallCnt.incrementAndGet();
 
-            for (Object key : keys)
+            Iterator<?> it = keys.iterator();
+
+            while (it.hasNext()) {
+                Object key = it.next();
+
                 storeMap.remove(key);
+
+                it.remove();
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9288ffe/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java
index aeb86e6..9d415c0 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java
@@ -19,21 +19,22 @@ package org.gridgain.grid.kernal.processors.dataload;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.transactions.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.eviction.fifo.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.cache.store.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.junits.common.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
+import javax.cache.configuration.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -78,7 +79,7 @@ public class GridDataLoaderProcessorSelfTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"IfMayBeConditional"})
+    @SuppressWarnings({"IfMayBeConditional", "unchecked"})
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
@@ -105,7 +106,11 @@ public class GridDataLoaderProcessorSelfTest extends 
GridCommonAbstractTest {
             cc.setEvictSynchronized(false);
             cc.setEvictNearSynchronized(false);
 
-            cc.setStore(store);
+            if (store != null) {
+                cc.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(store));
+                cc.setReadThrough(true);
+                cc.setWriteThrough(true);
+            }
 
             cfg.setCacheConfiguration(cc);
         }
@@ -780,7 +785,7 @@ public class GridDataLoaderProcessorSelfTest extends 
GridCommonAbstractTest {
                 storeMap.put(i, i);
 
             try (IgniteDataLoader<Object, Object> ldr = 
ignite.dataLoader(null)) {
-                ldr.skipStore(false);
+                assertFalse(ldr.skipStore());
 
                 for (int i = 0; i < 1000; i++)
                     ldr.removeData(i);
@@ -860,19 +865,19 @@ public class GridDataLoaderProcessorSelfTest extends 
GridCommonAbstractTest {
     /**
      *
      */
-    private class TestStore extends GridCacheStoreAdapter<Object, Object> {
+    private class TestStore extends CacheStoreAdapter<Object, Object> {
         /** {@inheritDoc} */
-        @Nullable @Override public Object load(@Nullable IgniteTx tx, Object 
key) throws IgniteCheckedException {
+        @Nullable @Override public Object load(Object key) {
             return storeMap.get(key);
         }
 
         /** {@inheritDoc} */
-        @Override public void put(@Nullable IgniteTx tx, Object key, Object 
val) throws IgniteCheckedException {
-            storeMap.put(key, val);
+        @Override public void write(Cache.Entry<?, ?> entry) {
+            storeMap.put(entry.getKey(), entry.getValue());
         }
 
         /** {@inheritDoc} */
-        @Override public void remove(@Nullable IgniteTx tx, Object key) throws 
IgniteCheckedException {
+        @Override public void delete(Object key) {
             storeMap.remove(key);
         }
     }

Reply via email to