Repository: incubator-ignite Updated Branches: refs/heads/ignite-110 [created] 9ec7c31c1
# ignite-110 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9ec7c31c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9ec7c31c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9ec7c31c Branch: refs/heads/ignite-110 Commit: 9ec7c31c16d5653ec9b4799276faa3b297ecc45c Parents: 3c948be Author: sboikov <sboi...@gridgain.com> Authored: Thu Jan 22 11:32:05 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jan 22 11:32:05 2015 +0300 ---------------------------------------------------------------------- .../examples/datagrid/CacheApiExample.java | 10 -- .../java/org/apache/ignite/IgniteCache.java | 172 +------------------ .../apache/ignite/IgniteCachingProvider.java | 2 +- .../org/apache/ignite/IgniteDataLoader.java | 20 ++- .../java/org/apache/ignite/cache/CacheFlag.java | 74 -------- .../processors/cache/IgniteCacheProxy.java | 95 ++-------- .../dataload/GridDataLoadCacheUpdaters.java | 6 +- .../dataload/GridDataLoadRequest.java | 36 +++- .../dataload/GridDataLoadUpdateJob.java | 12 +- .../dataload/GridDataLoaderProcessor.java | 13 +- .../dataload/IgniteDataLoaderImpl.java | 18 +- .../dr/GridDrDataLoadCacheUpdater.java | 2 +- .../cache/IgniteCacheInvokeAbstractTest.java | 4 +- .../tcp/GridCacheDhtLockBackupSelfTest.java | 2 +- .../GridCacheInterceptorAbstractSelfTest.java | 42 +---- .../GridCacheReturnValueTransferSelfTest.java | 3 +- .../distributed/GridCacheLockAbstractTest.java | 4 +- ...GridCacheValueConsistencyAtomicSelfTest.java | 4 +- .../GridDataLoaderProcessorSelfTest.java | 94 ++++++++++ 19 files changed, 216 insertions(+), 397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java b/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java index bab8736..ce64376 100644 --- a/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java +++ b/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java @@ -101,16 +101,6 @@ public class CacheApiExample { boolean b2 = cache.putIfAbsent(4, "44"); assert b1 && !b2; - - // Put-with-predicate, will succeed if predicate evaluates to true. - cache.put(5, "5"); - cache.putIf(5, "55", new IgnitePredicate<GridCacheEntry<Integer, String>>() { - @Override - public boolean apply(GridCacheEntry<Integer, String> e) { - return "5".equals(e.peek()); // Update only if previous value is "5". - } - }); - // Invoke - assign new value based on previous value. cache.put(6, "6"); cache.invoke(6, new EntryProcessor<Integer, String, Void>() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 1398ebe..ba33aa5 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -90,6 +90,11 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc); /** + * @return Cache with read-through write-through behavior disabled. + */ + public IgniteCache<K, V> withSkipStore(); + + /** * Executes {@link #localLoadCache(IgniteBiPredicate, Object...)} on all cache nodes. * * @param p Optional predicate (may be {@code null}). If provided, will be used to @@ -155,32 +160,6 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Nullable public V getAndPutIfAbsent(K key, V val) throws CacheException; /** - * Removes mappings from cache for entries for which the optionally passed in filters do - * pass. If passed in filters are {@code null}, then all entries in cache will be enrolled - * into transaction. - * <p> - * <b>USE WITH CARE</b> - if your cache has many entries that pass through the filter or if filter - * is empty, then transaction will quickly become very heavy and slow. Also, locks - * are acquired in undefined order, so it may cause a deadlock when used with - * other concurrent transactional updates. - * <p> - * If write-through is enabled, the values will be removed from {@link GridCacheStore} - * via {@link GridCacheStore#removeAll(IgniteTx, java.util.Collection)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link org.gridgain.grid.cache.GridCacheFlag#LOCAL}, {@link org.gridgain.grid.cache.GridCacheFlag#READ}. - * - * @param filter Filter used to supply keys for remove operation (if {@code null}, - * then nothing will be removed). - * @throws CacheException If remove failed. - * @throws org.gridgain.grid.cache.GridCacheFlagException If flags validation failed. - */ - public void removeAll(IgnitePredicate<Entry<K, V>> filter) throws CacheException; - - /** * Return a {@link CacheLock} instance associated with passed key. * This method does not acquire lock immediately, you have to call appropriate method on returned instance. * @@ -329,132 +308,6 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public int localSize(CachePeekMode... peekModes); /** - * Stores given key-value pair in cache. If filters are provided, then entries will - * be stored in cache only if they pass the filter. Note that filter check is atomic, - * so value stored in cache is guaranteed to be consistent with the filters. If cache - * previously contained value for the given key, then this value is returned. - * In case of {@link GridCacheMode#PARTITIONED} or {@link GridCacheMode#REPLICATED} caches, - * the value will be loaded from the primary node, which in its turn may load the value - * from the swap storage, and consecutively, if it's not in swap, - * from the underlying persistent storage. If value has to be loaded from persistent - * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be used. - * <p> - * If the returned value is not needed, method {@link #putIf(Object, Object, IgnitePredicate)} should - * always be used instead of this one to avoid the overhead associated with returning of the previous value. - * <p> - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param key Key to store in cache. - * @param val Value to be associated with the given key. - * @param filter Optional filter to check prior to putting value in cache. Note - * that filter check is atomic with put operation. - * @return Previous value associated with specified key, or {@code null} - * if entry did not pass the filter, or if there was no mapping for the key in swap - * or in persistent storage. - * @throws NullPointerException If either key or value are {@code null}. - * @throws GridCacheFlagException If projection flags validation failed. - */ - // TODO IGNITE-1 fix entry type. - @Nullable public V getAndPutIf(K key, V val, @Nullable IgnitePredicate<GridCacheEntry<K, V>> filter); - - /** - * Stores given key-value pair in cache. If filters are provided, then entries will - * be stored in cache only if they pass the filter. Note that filter check is atomic, - * so value stored in cache is guaranteed to be consistent with the filters. - * <p> - * This method will return {@code true} if value is stored in cache and {@code false} otherwise. - * Unlike {@link #getAndPutIf(Object, Object, IgnitePredicate)} method, it does not return previous - * value and, therefore, does not have any overhead associated with returning a value. It - * should be used whenever return value is not required. - * <p> - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param key Key to store in cache. - * @param val Value to be associated with the given key. - * @param filter Optional filter to check prior to putting value in cache. Note - * that filter check is atomic with put operation. - * @return {@code True} if optional filter passed and value was stored in cache, - * {@code false} otherwise. Note that this method will return {@code true} if filter is not - * specified. - * @throws NullPointerException If either key or value are {@code null}. - * @throws GridCacheFlagException If projection flags validation failed. - */ - // TODO IGNITE-1 fix entry type. - public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter); - - /** - * Removes given key mapping from cache. If cache previously contained value for the given key, - * then this value is returned. In case of {@link GridCacheMode#PARTITIONED} or {@link GridCacheMode#REPLICATED} - * caches, the value will be loaded from the primary node, which in its turn may load the value - * from the disk-based swap storage, and consecutively, if it's not in swap, - * from the underlying persistent storage. If value has to be loaded from persistent - * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be used. - * <p> - * If the returned value is not needed, method {@link #removeIf(Object, IgnitePredicate)} should - * always be used instead of this one to avoid the overhead associated with returning of the - * previous value. - * <p> - * If write-through is enabled, the value will be removed from {@link GridCacheStore} - * via {@link GridCacheStore#remove(IgniteTx, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param key Key whose mapping is to be removed from cache. - * @param filter Optional filter to check prior to removing value form cache. Note - * that filter is checked atomically together with remove operation. - * @return Previous value associated with specified key, or {@code null} - * if there was no value for this key. - * @throws NullPointerException If key is {@code null}. - * @throws GridCacheFlagException If projection flags validation failed. - */ - // TODO IGNITE-1 fix entry type. - public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter); - - /** - * Removes given key mapping from cache. - * <p> - * This method will return {@code true} if remove did occur, which means that all optionally - * provided filters have passed and there was something to remove, {@code false} otherwise. - * <p> - * If write-through is enabled, the value will be removed from {@link GridCacheStore} - * via {@link GridCacheStore#remove(IgniteTx, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param key Key whose mapping is to be removed from cache. - * @param filter Optional filter to check prior to removing value form cache. Note - * that filter is checked atomically together with remove operation. - * @return {@code True} if filter passed validation and entry was removed, {@code false} otherwise. - * Note that if filter is not specified, this method will return {@code true}. - * @throws NullPointerException if the key is {@code null}. - * @throws GridCacheFlagException If projection flags validation failed. - */ - // TODO IGNITE-1 fix entry type. - public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter); - - /** * @param map Map containing keys and entry processors to be applied to values. * @param args Additional arguments to pass to the {@link EntryProcessor}. * @return The map of {@link EntryProcessorResult}s of the processing per key, @@ -502,19 +355,4 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @return Projection for portable objects. */ public <K1, V1> IgniteCache<K1, V1> keepPortable(); - - /** - * Gets cache projection base on this one, but with the specified flags turned on. - * <h1 class="header">Cache Flags</h1> - * The resulting projection will inherit all the flags from this projection. - * - * @param flags Flags to turn on (if empty, then no-op). - * @return New projection based on this one, but with the specified flags turned on. - */ - public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags); - - /** - * @return Ignite instance. - */ - public Ignite ignite(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/apache/ignite/IgniteCachingProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCachingProvider.java b/modules/core/src/main/java/org/apache/ignite/IgniteCachingProvider.java index ea97cb0..1c325dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCachingProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCachingProvider.java @@ -119,7 +119,7 @@ public class IgniteCachingProvider implements CachingProvider { * @param cache Cache. */ public CacheManager findManager(IgniteCache<?,?> cache) { - Ignite ignite = cache.ignite(); + Ignite ignite = cache.unwrap(Ignite.class); synchronized (cacheManagers) { for (Map<URI, IgniteCacheManager> map : cacheManagers.values()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java index 21e5036..c602ae5 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java @@ -76,7 +76,7 @@ import java.util.*; * updates and allow data loader choose most optimal concurrent implementation. * </li> * <li> - * {@link #updater(org.apache.ignite.dataload.IgniteDataLoadCacheUpdater)} - defines how cache will be updated with loaded entries. + * {@link #updater(IgniteDataLoadCacheUpdater)} - defines how cache will be updated with loaded entries. * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way. * </li> * <li> @@ -113,7 +113,7 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { /** * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache. - * Should not be used when custom cache updater set using {@link #updater(org.apache.ignite.dataload.IgniteDataLoadCacheUpdater)} method. + * Should not be used when custom cache updater set using {@link #updater(IgniteDataLoadCacheUpdater)} method. * Default is {@code false}. * * @param isolated Flag value. @@ -122,6 +122,22 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { public void isolated(boolean isolated) throws IgniteCheckedException; /** + * Gets flag indicating that write-through behavior should be disabled for data loading. + * Default is {@code false}. + * + * @return Skip store flag. + */ + public boolean skipStore(); + + /** + * Sets flag indicating that write-through behavior should be disabled for data loading. + * Default is {@code false}. + * + * @param skipStore Skip store flag. + */ + public void skipStore(boolean skipStore); + + /** * Gets size of per node key-value pairs buffer. * * @return Per node buffer size. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/apache/ignite/cache/CacheFlag.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheFlag.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheFlag.java deleted file mode 100644 index d441096..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheFlag.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache; - -import org.apache.ignite.transactions.*; -import org.jetbrains.annotations.*; - -/** - * TODO: - * 1. Get rid of SKIP_STORE, SKIP_SWAP, LOCAL, READ, CLONE - * 2. Other properties should be moved to cache configuration. - * 3. This enum should become obsolete and removed. - */ -public enum CacheFlag { - /** Skips store, i.e. no read-through and no write-through behavior. */ - SKIP_STORE, - - /** Skip swap space for reads and writes. */ - SKIP_SWAP, - - /** Synchronous commit. */ - SYNC_COMMIT, - - /** - * Switches a cache projection to work in {@code 'invalidation'} mode. - * Instead of updating remote entries with new values, small invalidation - * messages will be sent to set the values to {@code null}. - * - * @see IgniteTx#isInvalidate() - * @see org.gridgain.grid.cache.GridCacheConfiguration#isInvalidate() - */ - INVALIDATE, - - /** - * Skips version check during {@link org.gridgain.grid.cache.GridCacheProjection#transform(Object, GridClosure)} writes in - * {@link org.gridgain.grid.cache.GridCacheAtomicityMode#ATOMIC} mode. By default, in {@code ATOMIC} mode, whenever - * {@code transform(...)} is called, cache values (and not the {@code transform} closure) are sent from primary - * node to backup nodes to ensure proper update ordering. - * <p> - * By setting this flag, version check is skipped, and the {@code transform} closure is applied on both, primary - * and backup nodes. Use this flag for better performance if you are sure that there are no - * concurrent updates happening for the same key when {@code transform(...)} method is called. - */ - FORCE_TRANSFORM_BACKUP; - - /** */ - private static final CacheFlag[] VALS = values(); - - /** - * Efficiently gets enumerated value from its ordinal. - * - * @param ord Ordinal value. - * @return Enumerated value or {@code null} if ordinal out of range. - */ - @Nullable - public static CacheFlag fromOrdinal(int ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 31b17d8..df98fc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -89,13 +89,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements return ctx; } - /** - * @return Ignite instance. - */ - @Override public GridEx ignite() { - return ctx.grid(); - } - /** {@inheritDoc} */ @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { GridCacheConfiguration cfg = ctx.config(); @@ -127,6 +120,11 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withSkipStore() { + return flagsOn(GridCacheFlag.SKIP_STORE); + } + + /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -165,74 +163,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } /** {@inheritDoc} */ - @Nullable @Override public V getAndPutIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.put(key, val, filter); - } - finally { - gate.leave(prev); - } - } - catch (IgniteCheckedException e) { - throw cacheException(e); - } - } - - /** {@inheritDoc} */ - @Override public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.putx(key, val, filter); - } - finally { - gate.leave(prev); - } - } - catch (IgniteCheckedException e) { - throw cacheException(e); - } - } - - /** {@inheritDoc} */ - @Override public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.remove(key, filter); - } - finally { - gate.leave(prev); - } - } - catch (IgniteCheckedException e) { - throw cacheException(e); - } - } - - /** {@inheritDoc} */ - @Override public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.removex(key, filter); - } - finally { - gate.leave(prev); - } - } - catch (IgniteCheckedException e) { - throw cacheException(e); - } - } - - /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -250,12 +180,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } /** {@inheritDoc} */ - @Override public void removeAll(IgnitePredicate filter) throws CacheException { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Override public CacheLock lock(K key) throws CacheException { return lockAll(Collections.<K>singleton(key)); } @@ -838,6 +762,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements @Override public <T> T unwrap(Class<T> clazz) { if (clazz.equals(IgniteCache.class)) return (T)this; + else if (clazz.equals(Ignite.class)) + return (T)ctx.grid(); throw new IllegalArgumentException("Unsupported class: " + clazz); } @@ -971,8 +897,11 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } } - /** {@inheritDoc} */ - @Override public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags) { + /** + * @param flags Flags to turn on (if empty, then no-op). + * @return Cache with given flags enabled. + */ + public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java index 03e1c80..2b66400 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java @@ -243,7 +243,9 @@ public class GridDataLoadCacheUpdaters { Map<Integer, Collection<K>> rmvPartMap = null; Map<Integer, Map<K, V>> putPartMap = null; - GridCacheAffinity<K> aff = cache.ignite().<K, V>cache(cache.getName()).affinity(); + Ignite ignite = cache.unwrap(Ignite.class); + + GridCacheAffinity<K> aff = ignite.<K, V>cache(cache.getName()).affinity(); for (Map.Entry<K, V> entry : entries) { K key = entry.getKey(); @@ -272,7 +274,7 @@ public class GridDataLoadCacheUpdaters { } } - IgniteTransactions txs = cache.ignite().transactions(); + IgniteTransactions txs = ignite.transactions(); for (Map.Entry<Integer, Integer> e : partsCounts.entrySet()) { Integer part = e.getKey(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadRequest.java index a9a1488..849b875 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadRequest.java @@ -31,7 +31,7 @@ import java.util.*; /** * */ -public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapter { +public class GridDataLoadRequest extends GridTcpCommunicationMessageAdapter { /** */ private static final long serialVersionUID = 0L; @@ -54,6 +54,9 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte private boolean ignoreDepOwnership; /** */ + private boolean skipStore; + + /** */ private IgniteDeploymentMode depMode; /** */ @@ -87,6 +90,7 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte * @param updaterBytes Cache updater. * @param colBytes Collection bytes. * @param ignoreDepOwnership Ignore ownership. + * @param skipStore Skip store flag. * @param depMode Deployment mode. * @param sampleClsName Sample class name. * @param userVer User version. @@ -100,6 +104,7 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte byte[] updaterBytes, byte[] colBytes, boolean ignoreDepOwnership, + boolean skipStore, IgniteDeploymentMode depMode, String sampleClsName, String userVer, @@ -112,6 +117,7 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte this.updaterBytes = updaterBytes; this.colBytes = colBytes; this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; this.depMode = depMode; this.sampleClsName = sampleClsName; this.userVer = userVer; @@ -163,6 +169,13 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte } /** + * @return Skip store flag. + */ + public boolean skipStore() { + return skipStore; + } + + /** * @return Deployment mode. */ public IgniteDeploymentMode deploymentMode() { @@ -314,12 +327,18 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte commState.idx++; case 10: - if (!commState.putByteArray(updaterBytes)) + if (!commState.putBoolean(skipStore)) return false; commState.idx++; case 11: + if (!commState.putByteArray(updaterBytes)) + return false; + + commState.idx++; + + case 12: if (!commState.putString(userVer)) return false; @@ -401,7 +420,7 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte if (commState.readSize >= 0) { if (ldrParticipants == null) - ldrParticipants = U.newHashMap(commState.readSize); + ldrParticipants = new HashMap<>(commState.readSize, 1.0f); for (int i = commState.readItems; i < commState.readSize; i++) { if (!commState.keyDone) { @@ -462,6 +481,14 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte commState.idx++; case 10: + if (buf.remaining() < 1) + return false; + + skipStore = commState.getBoolean(); + + commState.idx++; + + case 11: byte[] updaterBytes0 = commState.getByteArray(); if (updaterBytes0 == BYTE_ARR_NOT_READ) @@ -471,7 +498,7 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte commState.idx++; - case 11: + case 12: String userVer0 = commState.getString(); if (userVer0 == STR_NOT_READ) @@ -510,6 +537,7 @@ public class GridDataLoadRequest<K, V> extends GridTcpCommunicationMessageAdapte _clone.updaterBytes = updaterBytes; _clone.colBytes = colBytes; _clone.ignoreDepOwnership = ignoreDepOwnership; + _clone.skipStore = skipStore; _clone.depMode = depMode; _clone.sampleClsName = sampleClsName; _clone.userVer = userVer; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java index 8027071..bb45da8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java @@ -46,6 +46,9 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> { private final boolean ignoreDepOwnership; /** */ + private final boolean skipStore; + + /** */ private final IgniteDataLoadCacheUpdater<K, V> updater; /** @@ -57,9 +60,12 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> { * @param updater Updater. */ GridDataLoadUpdateJob( - GridKernalContext ctx, IgniteLogger log, @Nullable String cacheName, + GridKernalContext ctx, + IgniteLogger log, + @Nullable String cacheName, Collection<Map.Entry<K, V>> col, boolean ignoreDepOwnership, + boolean skipStore, IgniteDataLoadCacheUpdater<K, V> updater) { this.ctx = ctx; this.log = log; @@ -70,6 +76,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> { this.cacheName = cacheName; this.col = col; this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; this.updater = updater; } @@ -91,6 +98,9 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> { IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName); + if (skipStore) + cache = (IgniteCacheProxy<K, V>)cache.withSkipStore(); + if (ignoreDepOwnership) cache.context().deploy().ignoreOwnership(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java index b032006..fba509a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java @@ -68,7 +68,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { @Override public void onMessage(UUID nodeId, Object msg) { assert msg instanceof GridDataLoadRequest; - processDataLoadRequest(nodeId, (GridDataLoadRequest<K, V>)msg); + processDataLoadRequest(nodeId, (GridDataLoadRequest)msg); } }); @@ -184,7 +184,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { * @param nodeId Sender ID. * @param req Request. */ - private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest<K, V> req) { + private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) { if (!busyLock.enterBusy()) { if (log.isDebugEnabled()) log.debug("Ignoring data load request (node is stopping): " + req); @@ -251,8 +251,13 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { return; } - GridDataLoadUpdateJob<K, V> job = new GridDataLoadUpdateJob<>(ctx, log, req.cacheName(), col, - req.ignoreDeploymentOwnership(), updater); + GridDataLoadUpdateJob<K, V> job = new GridDataLoadUpdateJob<>(ctx, + log, + req.cacheName(), + col, + req.ignoreDeploymentOwnership(), + req.skipStore(), + updater); Exception err = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java index 9591e99..d77b15e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java @@ -149,6 +149,9 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay /** */ private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ; + /** */ + private boolean skipStore; + /** * @param ctx Grid kernal context. * @param cacheName Cache name. @@ -297,6 +300,16 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } /** {@inheritDoc} */ + @Override public boolean skipStore() { + return skipStore; + } + + /** {@inheritDoc} */ + @Override public void skipStore(boolean skipStore) { + this.skipStore = skipStore; + } + + /** {@inheritDoc} */ @Override @Nullable public String cacheName() { return cacheName; } @@ -905,7 +918,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay if (isLocNode) { fut = ctx.closure().callLocalSafe( - new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, updater), false); + new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false); locFuts.add(fut); @@ -995,13 +1008,14 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay reqs.put(reqId, (GridFutureAdapter<Object>)fut); - GridDataLoadRequest<Object, Object> req = new GridDataLoadRequest<>( + GridDataLoadRequest req = new GridDataLoadRequest( reqId, topicBytes, cacheName, updaterBytes, entriesBytes, true, + skipStore, dep != null ? dep.deployMode() : null, dep != null ? jobPda0.deployClass().getName() : null, dep != null ? dep.userVersion() : null, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java index c2b8dac..686814f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dr/GridDrDataLoadCacheUpdater.java @@ -41,7 +41,7 @@ public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoadCacheUpda throws IgniteCheckedException { String cacheName = cache0.getConfiguration(GridCacheConfiguration.class).getName(); - GridKernalContext ctx = ((GridKernal)cache0.ignite()).context(); + GridKernalContext ctx = ((GridKernal)cache0.unwrap(Ignite.class)).context(); IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class); GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java index 523cbf1..1a3fe32 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java @@ -57,7 +57,7 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT invoke(cache, OPTIMISTIC); } else if (gridCount() > 1) { - cache = cache.flagsOn(FORCE_TRANSFORM_BACKUP); + cache = ((IgniteCacheProxy<Integer, Integer>)cache).flagsOn(FORCE_TRANSFORM_BACKUP); invoke(cache, null); } @@ -186,7 +186,7 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT invokeAll(cache, OPTIMISTIC); } else if (gridCount() > 1) { - cache = cache.flagsOn(FORCE_TRANSFORM_BACKUP); + cache = ((IgniteCacheProxy<Integer, Integer>)cache).flagsOn(FORCE_TRANSFORM_BACKUP); invokeAll(cache, null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridCacheDhtLockBackupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridCacheDhtLockBackupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridCacheDhtLockBackupSelfTest.java index 54157c2..7cd6ec4 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridCacheDhtLockBackupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridCacheDhtLockBackupSelfTest.java @@ -189,7 +189,7 @@ public class GridCacheDhtLockBackupSelfTest extends GridCommonAbstractTest { info("Before remove all"); - cache1.flagsOn(GridCacheFlag.SYNC_COMMIT).removeAll(); + cache1.removeAll(); info("Remove all completed"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java index 682120a..58c705e 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java @@ -377,8 +377,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst return dataNodes + (storeEnabled() ? 1 : 0); // One call before store is updated. else { // If update goes through primary node and it is cancelled then backups aren't updated. - return (writeOrderMode() == PRIMARY || - (op == Operation.TRANSFORM || op == Operation.UPDATE_FILTER)) ? 1 : dataNodes; + return (writeOrderMode() == PRIMARY || op == Operation.TRANSFORM) ? 1 : dataNodes; } } @@ -392,10 +391,8 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst if (atomicityMode() == TRANSACTIONAL) // Update + after update + one call before store is updated. return dataNodes * 2 + (storeEnabled() ? 1 : 0); - else { - return (writeOrderMode() == PRIMARY || - (op == Operation.TRANSFORM || op == Operation.UPDATE_FILTER)) ? 2 : dataNodes * 2; - } + else + return (writeOrderMode() == PRIMARY || op == Operation.TRANSFORM) ? 2 : dataNodes * 2; } /** @@ -829,10 +826,6 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst for (IgniteTxConcurrency txConcurrency : IgniteTxConcurrency.values()) { for (IgniteTxIsolation txIsolation : IgniteTxIsolation.values()) { for (Operation op : Operation.values()) { - // TODO: GG-8118 enable when fixed. - if (op == Operation.UPDATE_FILTER && txConcurrency == OPTIMISTIC) - continue; - testNearNodeKey(txConcurrency, txIsolation, op); afterTest(); @@ -1200,18 +1193,6 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst break; } - case UPDATE_FILTER: { - Object old = cache.getAndRemoveIf(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() { - @Override public boolean apply(GridCacheEntry<String, Integer> entry) { - return true; - } - }); - - assertEquals(expRmvRet, old); - - break; - } - case TRANSFORM: { cache.invoke(key, new EntryProcessor<String, Integer, Void>() { @Override public Void process(MutableEntry<String, Integer> e, Object... args) { @@ -1246,18 +1227,6 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst break; } - case UPDATE_FILTER: { - Object old = cache.getAndPutIf(key, val, new P1<GridCacheEntry<String, Integer>>() { - @Override public boolean apply(GridCacheEntry<String, Integer> entry) { - return true; - } - }); - - assertEquals(expOld, old); - - break; - } - case TRANSFORM: { cache.invoke(key, new EntryProcessor<String, Integer, Void>() { @Override public Void process(MutableEntry<String, Integer> e, Object... args) { @@ -1490,11 +1459,6 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst * */ TRANSFORM, - - /** - * - */ - UPDATE_FILTER } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java index b6d4aee..68d4e62 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java @@ -19,6 +19,7 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; import org.gridgain.grid.cache.*; import org.gridgain.testframework.junits.common.*; @@ -141,7 +142,7 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest IgniteCache<Integer, TestObject> cache = grid(2).jcache(null); if (backups > 0 && atomicityMode == ATOMIC) - cache = cache.flagsOn(FORCE_TRANSFORM_BACKUP); + cache = ((IgniteCacheProxy<Integer, TestObject>)cache).flagsOn(FORCE_TRANSFORM_BACKUP); for (int i = 0; i < 100; i++) cache.put(i, new TestObject()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheLockAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheLockAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheLockAbstractTest.java index 44c86e5..d096b26 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheLockAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheLockAbstractTest.java @@ -121,11 +121,11 @@ public abstract class GridCacheLockAbstractTest extends GridCommonAbstractTest { info("Before 1st removeAll()."); - cache1.flagsOn(GridCacheFlag.SYNC_COMMIT).removeAll(); + cache1.removeAll(); info("Before 2nd removeAll()."); - cache2.flagsOn(GridCacheFlag.SYNC_COMMIT).removeAll(); + cache2.removeAll(); assert cache1.size() == 0 : "Cache is not empty: " + cache1; assert cache2.size() == 0 : "Cache is not empty: " + cache2; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java index 5ea4914..71fbdf7 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java @@ -18,6 +18,7 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.typedef.internal.*; @@ -29,6 +30,7 @@ import java.io.*; import java.util.concurrent.atomic.*; import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheFlag.*; /** * Tests cache value consistency for ATOMIC mode. @@ -73,7 +75,7 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi IgniteCache<Integer, Integer> cache = grid(idx).jcache(null); - cache = cache.flagsOn(GridCacheFlag.FORCE_TRANSFORM_BACKUP); + cache = ((IgniteCacheProxy<Integer, Integer>)cache).flagsOn(FORCE_TRANSFORM_BACKUP); cache.invoke(i, new Transformer(i)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ec7c31c/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java index 0cb0cbb..3f3cb84 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java @@ -22,11 +22,13 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.eviction.fifo.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.cache.store.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.junits.common.*; import org.jetbrains.annotations.*; @@ -47,6 +49,9 @@ import static org.apache.ignite.events.IgniteEventType.*; */ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { /** */ + private static ConcurrentHashMap<Object, Object> storeMap; + + /** */ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** */ @@ -61,6 +66,9 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { /** */ private boolean useGrpLock; + /** */ + private TestStore store; + /** {@inheritDoc} */ @Override public void afterTest() throws Exception { super.afterTest(); @@ -96,6 +104,8 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { cc.setEvictSynchronized(false); cc.setEvictNearSynchronized(false); + cc.setStore(store); + cfg.setCacheConfiguration(cc); } else @@ -750,6 +760,70 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testUpdateStore() throws Exception { + storeMap = new ConcurrentHashMap<>(); + + try { + store = new TestStore(); + + useCache = true; + + Ignite ignite = startGrid(1); + + startGrid(2); + startGrid(3); + + for (int i = 0; i < 1000; i++) + storeMap.put(i, i); + + try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) { + ldr.skipStore(false); + + for (int i = 0; i < 1000; i++) + ldr.removeData(i); + + for (int i = 1000; i < 2000; i++) + ldr.addData(i, i); + } + + for (int i = 0; i < 1000; i++) + assertNull(storeMap.get(i)); + + for (int i = 1000; i < 2000; i++) + assertEquals(i, storeMap.get(i)); + + try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) { + ldr.skipStore(true); + + for (int i = 0; i < 1000; i++) + ldr.addData(i, i); + + for (int i = 1000; i < 2000; i++) + ldr.removeData(i); + } + + IgniteCache<Object, Object> cache = ignite.jcache(null); + + for (int i = 0; i < 1000; i++) { + assertNull(storeMap.get(i)); + + assertEquals(i, cache.get(i)); + } + + for (int i = 1000; i < 2000; i++) { + assertEquals(i, storeMap.get(i)); + + assertNull(cache.localPeek(i)); + } + } + finally { + storeMap = null; + } + } + + /** * */ private static class TestObject { @@ -781,4 +855,24 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { return val; } } + + /** + * + */ + private class TestStore extends GridCacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ + @Nullable @Override public Object load(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException { + return storeMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void put(@Nullable IgniteTx tx, Object key, Object val) throws IgniteCheckedException { + storeMap.put(key, val); + } + + /** {@inheritDoc} */ + @Override public void remove(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException { + storeMap.remove(key); + } + } }