# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/365b3598 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/365b3598 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/365b3598 Branch: refs/heads/ignite-410 Commit: 365b35988646dd72d96e723409286e74e7fed547 Parents: 52c6106 Author: sboikov <semen.boi...@inria.fr> Authored: Mon Mar 9 21:05:18 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Mon Mar 9 21:14:56 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 37 +++++++++++++++++--- .../cache/GridCacheProjectionImpl.java | 25 +++++++------ .../distributed/dht/GridDhtCacheAdapter.java | 7 +--- .../dataload/IgniteDataLoaderImpl.java | 26 ++++++-------- ...eJdbcStoreAbstractMultithreadedSelfTest.java | 11 ++++-- 5 files changed, 64 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/365b3598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 4762c03..d84d0dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1632,6 +1632,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { long newTtl; long newExpireTime; + Object key0 = null; + Object updated0 = null; + synchronized (this) { boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); @@ -1655,8 +1658,35 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (op == GridCacheOperation.TRANSFORM) { transformClo = writeObj; - // TODO IGNITE-51 - writeObj0 = ((IgniteClosure)writeObj).apply(rawGetOrUnmarshalUnlocked(true)); + EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; + + oldVal = rawGetOrUnmarshalUnlocked(true); + + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal); + + try { + Object computed = entryProcessor.process(entry, invokeArgs); + + if (entry.modified()) { + writeObj0 = cctx.unwrapTemporary(entry.getValue()); + writeObj = cctx.toCacheObject(updated0); + } + else { + writeObj = oldVal; + writeObj0 = CU.value(oldVal, cctx, false); + } + + key0 = entry.key(); + + if (computed != null) + invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null); + } + catch (Exception e) { + invokeRes = new IgniteBiTuple(null, e); + + writeObj = oldVal; + writeObj0 = CU.value(oldVal, cctx, false); + } } else writeObj0 = CU.value((CacheObject)writeObj, cctx, false); @@ -1780,7 +1810,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { boolean readThrough = false; Object old0 = null; - Object updated0 = null; if (needVal && oldVal == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { @@ -1846,8 +1875,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } } - Object key0 = null; - // Calculate new value in case we met transform. if (op == GridCacheOperation.TRANSFORM) { assert conflictCtx == null : "Cannot be TRANSFORM here if conflict resolution was performed earlier."; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/365b3598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 342402c..1cacf36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.dr.*; +import org.apache.ignite.internal.processors.cache.local.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -188,19 +189,17 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V if (k == null || v == null) return false; - // TODO IGNITE-51. - IgniteBiPredicate<K, V> p = null; - - if (p != null) { - CacheFlag[] f = cctx.forceLocalRead(); - - try { - if (!p.apply(k, v)) - return false; - } - finally { - cctx.forceFlags(f); - } + if (filter != null) { + GridLocalCacheEntry e = new GridLocalCacheEntry(cctx, + cctx.toCacheKeyObject(k), + k.hashCode(), + cctx.toCacheObject(v), + null, + 0, + 0); + + if (!filter.apply(e)) + return false; } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/365b3598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 6136e5f..c8aac7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -378,12 +378,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); - // TODO IGNITE-51. - Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() { - @Override public KeyCacheObject apply(K key) { - return ctx.toCacheKeyObject(key); - } - }); + Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); ctx.store().loadAllFromStore(null, keys0, new CI2<KeyCacheObject, Object>() { @Override public void apply(KeyCacheObject key, Object val) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/365b3598/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 0678d1f..81fdbed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -375,18 +375,6 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) { A.notEmpty(entries, "entries"); - // TODO IGNITE-51. - Collection<? extends IgniteDataLoaderEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, IgniteDataLoaderEntry>() { - @Override public IgniteDataLoaderEntry apply(Entry<K, V> e) { - KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey()); - CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue()); - - return new IgniteDataLoaderEntry(key, val); - } - }); - - return addDataInternal(entries0); - /* enterBusy(); try { @@ -402,10 +390,19 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1); for (Map.Entry<K, V> entry : entries) - keys.add(entry.getKey()); + keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, entry.getKey())); } - load0(entries, resFut, keys, 0); + Collection<? extends IgniteDataLoaderEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, IgniteDataLoaderEntry>() { + @Override public IgniteDataLoaderEntry apply(Entry<K, V> e) { + KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey()); + CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue()); + + return new IgniteDataLoaderEntry(key, val); + } + }); + + load0(entries0, resFut, keys, 0); return new IgniteFutureImpl<>(resFut); } @@ -415,7 +412,6 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay finally { leaveBusy(); } - */ } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/365b3598/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java index dd9314b..6675616 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java @@ -222,11 +222,16 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach for (int i = 0; i < TX_CNT; i++) { int cnt = rnd.nextInt(BATCH_CNT); - Map<Object, Object> map = U.newHashMap(cnt); + List<Integer> ids = new ArrayList<>(cnt); - for (int j = 0; j < cnt; j++) { - int id = rnd.nextInt(); + for (int j = 0; j < cnt; j++) + ids.add(rnd.nextInt()); + Collections.sort(ids); + + Map<Object, Object> map = U.newLinkedHashMap(cnt); + + for (Integer id : ids) { if (rnd.nextBoolean()) map.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id)); else