# ignite-42
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/738c67fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/738c67fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/738c67fd Branch: refs/heads/ignite-107 Commit: 738c67fdb935e90025c18fa6c5fe428ffa95eb90 Parents: f4b3995 Author: sboikov <sboi...@gridgain.com> Authored: Tue Jan 20 14:22:18 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jan 20 14:22:18 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 37 ++- .../IgniteNullArgumentCheckedException.java | 32 +++ .../processors/cache/CacheInvokeEntry.java | 17 +- .../processors/cache/GridCacheAdapter.java | 241 ++++++++++++++++--- .../processors/cache/GridCacheStoreManager.java | 195 ++++++++++----- .../dht/atomic/GridDhtAtomicCache.java | 4 + .../IgniteCacheLoadAllAbstractTest.java | 65 +++++ 7 files changed, 486 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 1e4426e..8e346e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -552,10 +552,31 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void loadAll(Set<? extends K> keys, - boolean replaceExistingValues, - @Nullable CompletionListener completionLsnr) { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + boolean replaceExisting, + @Nullable final CompletionListener completionLsnr) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + IgniteFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting); + + if (completionLsnr != null) { + fut.listenAsync(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> fut) { + try { + fut.get(); + + completionLsnr.onCompletion(); + } + catch (IgniteCheckedException e) { + completionLsnr.onException(cacheException(e)); + } + } + }); + } + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ @@ -1094,14 +1115,14 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @return Cache exception. */ private CacheException cacheException(IgniteCheckedException e) { - Throwable[] suppressed = e.getSuppressed(); + if (e instanceof GridCachePartialUpdateException) + return new CachePartialUpdateException((GridCachePartialUpdateException)e); + else if (e instanceof IgniteNullArgumentCheckedException) + throw new NullPointerException(e.getMessage()); if (e.getCause() instanceof CacheException) return (CacheException)e.getCause(); - if (e instanceof GridCachePartialUpdateException) - return new CachePartialUpdateException((GridCachePartialUpdateException)e); - return new CacheException(e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteNullArgumentCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteNullArgumentCheckedException.java b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteNullArgumentCheckedException.java new file mode 100644 index 0000000..42cc75a --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteNullArgumentCheckedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal; + +import org.apache.ignite.*; + +/** + * + */ +public class IgniteNullArgumentCheckedException extends IgniteCheckedException { + /** + * @param msg Message. + */ + public IgniteNullArgumentCheckedException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java index ab7dfc4..e44aff1 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java @@ -11,6 +11,7 @@ package org.gridgain.grid.kernal.processors.cache; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; import javax.cache.processor.*; @@ -29,13 +30,20 @@ public class CacheInvokeEntry<K, V> implements MutableEntry<K, V> { /** */ private boolean modified; + /** */ + private final boolean hadVal; + /** * @param key Key. * @param val Value. */ - public CacheInvokeEntry(K key, V val) { + public CacheInvokeEntry(K key, @Nullable V val) { + assert key != null; + this.key = key; this.val = val; + + hadVal = val != null; } /** {@inheritDoc} */ @@ -76,9 +84,14 @@ public class CacheInvokeEntry<K, V> implements MutableEntry<K, V> { } /** - * @return {@code True} if {@link #setValue} or {@link #remove was called}. + * @return {@code True} if entry was modified. */ public boolean modified() { + if (modified) { + if (!hadVal && val == null) + return false; + } + return modified; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index ae2da3e..8b27501 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -3355,45 +3355,146 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im return ctx.tm().synchronizations(); } - /** {@inheritDoc} */ - @Override public void loadCache(final IgniteBiPredicate<K, V> p, final long ttl, Object[] args) throws IgniteCheckedException { - final boolean replicate = ctx.isDrEnabled(); - final long topVer = ctx.affinity().affinityTopologyVersion(); + /** + * @param keys Keys. + * @param replaceExisting Replace existing values flag. + * @return Load future. + */ + public IgniteFuture<?> loadAll(final Set<? extends K> keys, + boolean replaceExisting) { + A.notNull(keys, "keys"); - if (ctx.store().isLocalStore()) { - try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { - ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); + if (keys.size() < 10) { + for (K key : keys) { + if (key == null) + throw new NullPointerException(); + } + } - final Collection<Map.Entry<K, V>> col = new ArrayList<>(ldr.perNodeBufferSize()); + if (!ctx.store().configured()) + return new GridFinishedFuture<>(ctx.kernalContext()); - ctx.store().loadCache(new CIX3<K, V, GridCacheVersion>() { - @Override public void applyx(K key, V val, GridCacheVersion ver) throws IgniteCheckedException { - assert ver != null; + if (replaceExisting) { + if (ctx.store().isLocalStore()) { + assert false; - if (p != null && !p.apply(key, val)) - return; + return null; + } + else { + return ctx.closures().callLocalSafe(new Callable<Void>() { + @Override public Void call() throws Exception { + loadAll(keys); - if (ctx.portableEnabled()) { - key = (K)ctx.marshalToPortable(key); - val = (V)ctx.marshalToPortable(val); + return null; + } + }); + } + } + else { + return ctx.closures().callLocalSafe(new Callable<Void>() { + @Override public Void call() throws Exception { + // Version for all loaded entries. + final GridCacheVersion ver0 = ctx.versions().nextForLoad(); + final boolean replicate = ctx.isDrEnabled(); + final long topVer = ctx.affinity().affinityTopologyVersion(); + + ctx.store().loadAllFromStore(null, keys, new CIX2<K, V>() { + @Override public void applyx(K key, V val) + throws PortableException { + if (ctx.portableEnabled()) { + key = (K)ctx.marshalToPortable(key); + val = (V)ctx.marshalToPortable(val); + } + + GridCacheEntryEx<K, V> entry = entryEx(key, false); + + try { + entry.initialValue(val, null, ver0, 0, -1, false, topVer, replicate ? DR_LOAD : DR_NONE); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to put cache value: " + entry, e); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during loadCache (will ignore): " + entry); + } + finally { + ctx.evicts().touch(entry, topVer); + } + + CU.unwindEvicts(ctx); } + }); - GridVersionedEntry<K,V> e = new GridRawVersionedEntry<>(key, null, val, null, ttl, 0, ver); + return null; + } + }); + } + } - e.marshal(ctx.marshaller()); + /** + * @param keys Keys. + * @throws IgniteCheckedException If failed. + */ + private void loadAllLocalStore(final Set<? extends K> keys) throws IgniteCheckedException { + assert ctx.store().isLocalStore(); - col.add(e); + try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { + ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); - if (col.size() == ldr.perNodeBufferSize()) { - ldr.addData(col); + LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0); - col.clear(); - } + ctx.store().loadAllFromLocalStore(null, keys, c); + + c.onDone(); + } + } + + /** + * @param keys Keys. + * @throws IgniteCheckedException If failed. + */ + private void loadAll(final Set<? extends K> keys) throws IgniteCheckedException { + try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { + final Collection<Map.Entry<K, V>> col = new ArrayList<>(ldr.perNodeBufferSize()); + + ctx.store().loadAllFromStore(null, keys, new CIX2<K, V>() { + @Override public void applyx(K key, V val) throws IgniteCheckedException { + if (ctx.portableEnabled()) { + key = (K)ctx.marshalToPortable(key); + val = (V)ctx.marshalToPortable(val); } - }, args); - if (!col.isEmpty()) - ldr.addData(col); + col.add(new GridMapEntry<>(key, val)); + + if (col.size() == ldr.perNodeBufferSize()) { + ldr.addData(col); + + col.clear(); + } + } + }); + + if (!col.isEmpty()) + ldr.addData(col); + } + } + + /** {@inheritDoc} */ + @Override public void loadCache(final IgniteBiPredicate<K, V> p, final long ttl, Object[] args) + throws IgniteCheckedException { + final boolean replicate = ctx.isDrEnabled(); + final long topVer = ctx.affinity().affinityTopologyVersion(); + + if (ctx.store().isLocalStore()) { + try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { + ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); + + LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, ttl); + + ctx.store().loadCache(c, args); + + c.onDone(); } } else { @@ -3439,8 +3540,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im @Override public IgniteFuture<?> loadCacheAsync(final IgniteBiPredicate<K, V> p, final long ttl, final Object[] args) { return ctx.closures().callLocalSafe( ctx.projectSafe(new Callable<Object>() { - @Nullable - @Override public Object call() throws IgniteCheckedException { + @Nullable @Override public Object call() throws IgniteCheckedException { loadCache(p, ttl, args); return null; @@ -4509,7 +4609,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im ctx.denyOnFlags(F.asList(LOCAL, READ)); return ctx.closures().callLocalSafe(ctx.projectSafe(new Callable<V>() { - @Nullable @Override public V call() throws IgniteCheckedException { + @Nullable + @Override + public V call() throws IgniteCheckedException { return reload(key, filter); } }), true); @@ -4537,7 +4639,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im ctx.denyOnFlag(READ); return ctx.closures().callLocalSafe(ctx.projectSafe(new GPC() { - @Nullable @Override public Object call() throws IgniteCheckedException { + @Nullable + @Override + public Object call() throws IgniteCheckedException { reloadAll(filter); return null; @@ -4547,11 +4651,13 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im /** * @param keys Keys. + * @param deserializePortable Deserialize portable flag. * @param filter Filter to evaluate. * @return Read future. */ public IgniteFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys, - boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>> filter) { + boolean deserializePortable, + @Nullable IgnitePredicate<GridCacheEntry<K, V>> filter) { String taskName = ctx.kernalContext().job().currentTaskName(); if (ctx.portableEnabled() && !F.isEmpty(keys)) { @@ -4562,8 +4668,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im }); } - return getAllAsync(keys, ctx.hasFlag(GET_PRIMARY), /*skip tx*/false, null, null, taskName, - deserializePortable, filter); + return getAllAsync(keys, + ctx.hasFlag(GET_PRIMARY), + /*skip tx*/false, + null, + null, + taskName, + deserializePortable, + filter); } /** @@ -5060,4 +5172,67 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im return S.toString(GetExpiryPolicy.class, this); } } + + /** + * + */ + private class LocalStoreLoadClosure extends CIX3<K, V, GridCacheVersion> { + /** */ + final IgniteBiPredicate<K, V> p; + + /** */ + final Collection<Map.Entry<K, V>> col; + + /** */ + final IgniteDataLoader<K, V> ldr; + + /** */ + final long ttl; + + /** + * @param p Key/value predicate. + * @param ldr Loader. + * @param ttl TTL. + */ + private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, IgniteDataLoader<K, V> ldr, long ttl) { + this.p = p; + this.ldr = ldr; + this.ttl = ttl; + + col = new ArrayList<>(ldr.perNodeBufferSize()); + } + + /** {@inheritDoc} */ + @Override public void applyx(K key, V val, GridCacheVersion ver) throws IgniteCheckedException { + assert ver != null; + + if (p != null && !p.apply(key, val)) + return; + + if (ctx.portableEnabled()) { + key = (K)ctx.marshalToPortable(key); + val = (V)ctx.marshalToPortable(val); + } + + GridVersionedEntry<K,V> e = new GridRawVersionedEntry<>(key, null, val, null, ttl, 0, ver); + + e.marshal(ctx.marshaller()); + + col.add(e); + + if (col.size() == ldr.perNodeBufferSize()) { + ldr.addData(col); + + col.clear(); + } + } + + /** + * Adds remaining data to loader. + */ + void onDone() { + if (!col.isEmpty()) + ldr.addData(col); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java index 41a8e64..11e11a3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java @@ -208,7 +208,21 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return Loaded value, possibly <tt>null</tt>. * @throws IgniteCheckedException If data loading failed. */ + @SuppressWarnings("unchecked") @Nullable public V loadFromStore(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + return (V)loadFromStore(tx, key, true); + } + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param key Cache key. + * @param convert Convert flag. + * @return Loaded value, possibly <tt>null</tt>. + * @throws IgniteCheckedException If data loading failed. + */ + @Nullable public Object loadFromStore(@Nullable IgniteTx tx, K key, boolean convert) throws IgniteCheckedException { if (store != null) { if (key instanceof GridCacheInternal) // Never load internal keys from store as they are never persisted. @@ -220,16 +234,22 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Loading value from store for key: " + key); - V val = null; + Object val = null; boolean ses = initSession(tx); try { - val = convert(singleThreadGate.load(key)); + val = singleThreadGate.load(key); } catch (ClassCastException e) { handleClassCastException(e); } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } finally { if (ses) sesHolder.set(null); @@ -238,7 +258,13 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Loaded value from store [key=" + key + ", val=" + val + ']'); - return cctx.portableEnabled() ? (V)cctx.marshalToPortable(val) : val; + if (convert) { + val = convert(val); + + return cctx.portableEnabled() ? cctx.marshalToPortable(val) : val; + } + else + return val; } return null; @@ -263,6 +289,21 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { } /** + * @param tx Cache transaction. + * @param keys Cache keys. + * @param vis Closer to cache loaded elements. + * @throws IgniteCheckedException If data loading failed. + */ + public void loadAllFromLocalStore(@Nullable IgniteTx tx, + Collection<? extends K> keys, + final GridInClosure3<K, V, GridCacheVersion> vis) throws IgniteCheckedException { + assert store != null; + assert locStore; + + loadAllFromStore(null, keys, null, vis); + } + + /** * Loads data from persistent store. * * @param tx Cache transaction. @@ -276,85 +317,115 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { Collection<? extends K> keys, final IgniteBiInClosure<K, V> vis) throws IgniteCheckedException { if (store != null) { - if (!keys.isEmpty()) { - if (keys.size() == 1) { - K key = F.first(keys); + loadAllFromStore(null, keys, vis, null); - vis.apply(key, loadFromStore(tx, key)); + return true; + } + else { + for (K key : keys) + vis.apply(key, null); + } - return true; - } + return false; + } - Collection<? extends K> keys0 = convertPortable ? - F.viewReadOnly(keys, new C1<K, K>() { - @Override public K apply(K k) { - return (K)cctx.unwrapPortableIfNeeded(k, false); - } - }) : - keys; + /** + * @param tx Cache transaction. + * @param keys Keys to load. + * @param vis Key/value closure (only one of vis or verVis can be specified). + * @param verVis Key/value/version closure (only one of vis or verVis can be specified). + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void loadAllFromStore(@Nullable IgniteTx tx, + Collection<? extends K> keys, + final @Nullable IgniteBiInClosure<K, V> vis, + final @Nullable GridInClosure3<K, V, GridCacheVersion> verVis) throws IgniteCheckedException { + assert vis != null ^ verVis != null; + assert verVis == null || locStore; - if (log.isDebugEnabled()) - log.debug("Loading values from store for keys: " + keys0); + final boolean convert = verVis == null; - boolean ses = initSession(tx); + if (!keys.isEmpty()) { + if (keys.size() == 1) { + K key = F.first(keys); - try { - if (keys.size() > singleThreadGate.loadAllThreshold()) { - Map<K, Object> map = store.loadAll(keys0); + if (convert) + vis.apply(key, loadFromStore(tx, key)); + else { + IgniteBiTuple<V, GridCacheVersion> t = + (IgniteBiTuple<V, GridCacheVersion>)loadFromStore(tx, key, false); + + if (t != null) + verVis.apply(key, t.get1(), t.get2()); + } - if (map != null) { - for (Map.Entry<K, Object> e : map.entrySet()) { - K k = e.getKey(); + return; + } + + Collection<? extends K> keys0 = convertPortable ? + F.viewReadOnly(keys, new C1<K, K>() { + @Override public K apply(K k) { + return (K)cctx.unwrapPortableIfNeeded(k, false); + } + }) : + keys; - V v = convert(e.getValue()); + if (log.isDebugEnabled()) + log.debug("Loading values from store for keys: " + keys0); - if (cctx.portableEnabled()) { - k = (K)cctx.marshalToPortable(k); - v = (V)cctx.marshalToPortable(v); - } + boolean ses = initSession(tx); - vis.apply(k, v); + try { + CI2<K, Object> c = new CI2<K, Object>() { + @Override public void apply(K k, Object val) { + if (convert) { + V v = convert(val); + + if (cctx.portableEnabled()) { + k = (K)cctx.marshalToPortable(k); + v = (V)cctx.marshalToPortable(v); } + + vis.apply(k, v); + } + else { + IgniteBiTuple<V, GridCacheVersion> v = (IgniteBiTuple<V, GridCacheVersion>)val; + + if (v != null) + verVis.apply(k, v.get1(), v.get2()); } } - else { - singleThreadGate.loadAll(keys0, new CI2<K, Object>() { - @Override public void apply(K k, Object o) { - V v = convert(o); + }; - if (cctx.portableEnabled()) { - k = (K)cctx.marshalToPortable(k); - v = (V)cctx.marshalToPortable(v); - } + if (keys.size() > singleThreadGate.loadAllThreshold()) { + Map<K, Object> map = store.loadAll(keys0); - vis.apply(k, v); - } - }); + if (map != null) { + for (Map.Entry<K, Object> e : map.entrySet()) + c.apply(e.getKey(), e.getValue()); } } - catch (ClassCastException e) { - handleClassCastException(e); - } - catch (Exception e) { - throw U.cast(e); - } - finally { - if (ses) - sesHolder.set(null); - } - - if (log.isDebugEnabled()) - log.debug("Loaded values from store for keys: " + keys0); + else + singleThreadGate.loadAll(keys0, c); + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + if (ses) + sesHolder.set(null); } - return true; - } - else { - for (K key : keys) - vis.apply(key, null); + if (log.isDebugEnabled()) + log.debug("Loaded values from store for keys: " + keys0); } - - return false; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8a3d0d1..0dbf7dd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -23,6 +23,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.managers.communication.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; @@ -871,6 +872,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Optimistically expect that all keys are available locally (avoid creation of get future). for (K key : keys) { + if (key == null) + return new GridFinishedFuture<>(ctx.kernalContext(), new IgniteNullArgumentCheckedException("Key is null.")); + GridCacheEntryEx<K, V> entry = null; while (true) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java new file mode 100644 index 0000000..5cbd2cb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.integration; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; + +import javax.cache.integration.*; +import java.util.*; + +/** + * Test for {@link javax.cache.Cache#loadAll(Set, boolean, CompletionListener)}. + */ +public abstract class IgniteCacheLoadAllAbstractTest extends IgniteCacheAbstractTest { + /** + * @throws Exception If failed. + */ + public void testLoadAll() throws Exception { + IgniteCache<Integer, String> cache = jcache(0); + + for (int i = 0; i < 1000; i++) + cache.put(i, String.valueOf(i)); + + stopAllGrids(); + + startGrids(); + + cache = jcache(0); + + Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < 100; i++) + keys.add(i); + + Set<Integer> nonExistKeys = new HashSet<>(); + + for (int i = 10_000; i < 10_010; i++) + nonExistKeys.add(i); + + keys.addAll(nonExistKeys); + + CompletionListener lsnr = new CompletionListenerFuture(); + + cache.loadAll(keys, false, lsnr); + + for (int i = 0; i < gridCount(); i++) { + + } + } +}