http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java new file mode 100644 index 0000000..5ce42f9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java @@ -0,0 +1,1015 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.thread.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import javax.cache.integration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static javax.cache.Cache.*; + +/** + * Internal wrapper for a {@link CacheStore} that enables write-behind logic. + * <p/> + * The general purpose of this approach is to reduce cache store load under high + * store update rate. The idea is to cache all write and remove operations in a pending + * map and delegate these changes to the underlying store either after timeout or + * if size of a pending map exceeded some pre-configured value. Another performance gain + * is achieved due to combining a group of similar operations to a single batch update. + * <p/> + * The essential flush size for the write-behind cache should be at least the estimated + * count of simultaneously written keys. In case of significantly smaller value there would + * be triggered a lot of flush events that will result in a high cache store load. + * <p/> + * Since write operations to the cache store are deferred, transaction support is lost; no + * transaction objects are passed to the underlying store. + */ +public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware { + /** Default write cache initial capacity. */ + public static final int DFLT_INITIAL_CAPACITY = 1024; + + /** Overflow ratio for critical cache size calculation. */ + public static final float CACHE_OVERFLOW_RATIO = 1.5f; + + /** Default concurrency level of write cache. */ + public static final int DFLT_CONCUR_LVL = 64; + + /** Write cache initial capacity. */ + private int initCap = DFLT_INITIAL_CAPACITY; + + /** Concurrency level for write cache access. */ + private int concurLvl = DFLT_CONCUR_LVL; + + /** When cache size exceeds this value eldest entry will be stored to the underlying store. */ + private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE; + + /** Critical cache size. If cache size exceeds this value, data flush performed synchronously. */ + private int cacheCriticalSize; + + /** Count of worker threads performing underlying store updates. */ + private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT; + + /** Cache flush frequency. All pending operations will be performed in not less then this value ms. */ + private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY; + + /** Maximum batch size for put and remove operations */ + private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE; + + /** Grid name. */ + private String gridName; + + /** Cache name. */ + private String cacheName; + + /** Underlying store. */ + private CacheStore<K, V> store; + + /** Write cache. */ + private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache; + + /** Flusher threads. */ + private GridWorker[] flushThreads; + + /** Atomic flag indicating store shutdown. */ + private AtomicBoolean stopping = new AtomicBoolean(true); + + /** Flush lock. */ + private Lock flushLock = new ReentrantLock(); + + /** Condition to determine records available for flush. */ + private Condition canFlush = flushLock.newCondition(); + + /** Variable for counting total cache overflows. */ + private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger(); + + /** Variable contains current number of overflow events. */ + private AtomicInteger cacheOverflowCntr = new AtomicInteger(); + + /** Variable for counting key-value pairs that are in {@link ValueStatus#RETRY} state. */ + private AtomicInteger retryEntriesCnt = new AtomicInteger(); + + /** Log. */ + private IgniteLogger log; + + /** Store manager. */ + private CacheStoreManager storeMgr; + + /** + * Creates a write-behind cache store for the given store. + * + * @param storeMgr Store manager. + * @param gridName Grid name. + * @param cacheName Cache name. + * @param log Grid logger. + * @param store {@code GridCacheStore} that need to be wrapped. + */ + public GridCacheWriteBehindStore( + CacheStoreManager storeMgr, + String gridName, + String cacheName, + IgniteLogger log, + CacheStore<K, V> store) { + this.storeMgr = storeMgr; + this.gridName = gridName; + this.cacheName = cacheName; + this.log = log; + this.store = store; + } + + /** + * Sets initial capacity for the write cache. + * + * @param initCap Initial capacity. + */ + public void setInitialCapacity(int initCap) { + this.initCap = initCap; + } + + /** + * Sets concurrency level for the write cache. Concurrency level is expected count of concurrent threads + * attempting to update cache. + * + * @param concurLvl Concurrency level. + */ + public void setConcurrencyLevel(int concurLvl) { + this.concurLvl = concurLvl; + } + + /** + * Sets the maximum size of the write cache. When the count of unique keys in write cache exceeds this value, + * the eldest entry in the cache is immediately scheduled for write to the underlying store. + * + * @param cacheMaxSize Max cache size. + */ + public void setFlushSize(int cacheMaxSize) { + this.cacheMaxSize = cacheMaxSize; + } + + /** + * Gets the maximum size of the write-behind buffer. When the count of unique keys + * in write buffer exceeds this value, the buffer is scheduled for write to the underlying store. + * <p/> + * If this value is {@code 0}, then flush is performed only on time-elapsing basis. However, + * when this value is {@code 0}, the cache critical size is set to + * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE} + * + * @return Buffer size that triggers flush procedure. + */ + public int getWriteBehindFlushSize() { + return cacheMaxSize; + } + + /** + * Sets the number of threads that will perform store update operations. + * + * @param flushThreadCnt Count of worker threads. + */ + public void setFlushThreadCount(int flushThreadCnt) { + this.flushThreadCnt = flushThreadCnt; + } + + /** + * Gets the number of flush threads that will perform store update operations. + * + * @return Count of worker threads. + */ + public int getWriteBehindFlushThreadCount() { + return flushThreadCnt; + } + + /** + * Sets the cache flush frequency. All pending operations on the underlying store will be performed + * within time interval not less then this value. + * + * @param cacheFlushFreq Time interval value in milliseconds. + */ + public void setFlushFrequency(long cacheFlushFreq) { + this.cacheFlushFreq = cacheFlushFreq; + } + + /** + * Gets the cache flush frequency. All pending operations on the underlying store will be performed + * within time interval not less then this value. + * <p/> + * If this value is {@code 0}, then flush is performed only when buffer size exceeds flush size. + * + * @return Flush frequency in milliseconds. + */ + public long getWriteBehindFlushFrequency() { + return cacheFlushFreq; + } + + /** + * Sets the maximum count of similar operations that can be grouped to a single batch. + * + * @param batchSize Maximum count of batch. + */ + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + /** + * Gets the maximum count of similar (put or remove) operations that can be grouped to a single batch. + * + * @return Maximum size of batch. + */ + public int getWriteBehindStoreBatchSize() { + return batchSize; + } + + /** + * Gets count of entries that were processed by the write-behind store and have not been + * flushed to the underlying store yet. + * + * @return Total count of entries in cache store internal buffer. + */ + public int getWriteBehindBufferSize() { + return writeCache.sizex(); + } + + /** + * @return Underlying store. + */ + public CacheStore<K, V> store() { + return store; + } + + /** + * Performs all the initialization logic for write-behind cache store. + * This class must not be used until this method returns. + */ + @Override public void start() { + assert cacheFlushFreq != 0 || cacheMaxSize != 0; + + if (stopping.compareAndSet(true, false)) { + if (log.isDebugEnabled()) + log.debug("Starting write-behind store for cache '" + cacheName + '\''); + + cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO); + + if (cacheCriticalSize == 0) + cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE; + + flushThreads = new GridWorker[flushThreadCnt]; + + writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl); + + for (int i = 0; i < flushThreads.length; i++) { + flushThreads[i] = new Flusher(gridName, "flusher-" + i, log); + + new IgniteThread(flushThreads[i]).start(); + } + } + } + + /** + * Gets count of write buffer overflow events since initialization. Each overflow event causes + * the ongoing flush operation to be performed synchronously. + * + * @return Count of cache overflow events since start. + */ + public int getWriteBehindTotalCriticalOverflowCount() { + return cacheTotalOverflowCntr.get(); + } + + /** + * Gets count of write buffer overflow events in progress at the moment. Each overflow event causes + * the ongoing flush operation to be performed synchronously. + * + * @return Count of cache overflow events since start. + */ + public int getWriteBehindCriticalOverflowCount() { + return cacheOverflowCntr.get(); + } + + /** + * Gets count of cache entries that are in a store-retry state. An entry is assigned a store-retry state + * when underlying store failed due some reason and cache has enough space to retain this entry till + * the next try. + * + * @return Count of entries in store-retry state. + */ + public int getWriteBehindErrorRetryCount() { + return retryEntriesCnt.get(); + } + + /** + * Performs shutdown logic for store. No put, get and remove requests will be processed after + * this method is called. + */ + @Override public void stop() { + if (stopping.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Stopping write-behind store for cache '" + cacheName + '\''); + + wakeUp(); + + boolean graceful = true; + + for (GridWorker worker : flushThreads) + graceful &= U.join(worker, log); + + if (!graceful) + log.warning("Shutdown was aborted"); + } + } + + /** + * Forces all entries collected to be flushed to the underlying store. + * @throws IgniteCheckedException If failed. + */ + public void forceFlush() throws IgniteCheckedException { + wakeUp(); + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) { + store.loadCache(clo, args); + } + + /** {@inheritDoc} */ + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) { + if (log.isDebugEnabled()) + log.debug("Store load all [keys=" + keys + ']'); + + Map<K, V> loaded = new HashMap<>(); + + Collection<K> remaining = new LinkedList<>(); + + for (K key : keys) { + StatefulValue<K, V> val = writeCache.get(key); + + if (val != null) { + val.readLock().lock(); + + try { + if (val.operation() == StoreOperation.PUT) + loaded.put(key, val.entry().getValue()); + else + assert val.operation() == StoreOperation.RMV : val.operation(); + } + finally { + val.readLock().unlock(); + } + } + else + remaining.add(key); + } + + // For items that were not found in queue. + if (!remaining.isEmpty()) { + Map<K, V> loaded0 = store.loadAll(remaining); + + if (loaded0 != null) + loaded.putAll(loaded0); + } + + return loaded; + } + + /** {@inheritDoc} */ + @Override public V load(K key) { + if (log.isDebugEnabled()) + log.debug("Store load [key=" + key + ']'); + + StatefulValue<K, V> val = writeCache.get(key); + + if (val != null) { + val.readLock().lock(); + + try { + switch (val.operation()) { + case PUT: + return val.entry().getValue(); + + case RMV: + return null; + + default: + assert false : "Unexpected operation: " + val.status(); + } + } + finally { + val.readLock().unlock(); + } + } + + return store.load(key); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Entry<? extends K, ? extends V>> entries) { + for (Entry<? extends K, ? extends V> e : entries) + write(e); + } + + /** {@inheritDoc} */ + @Override public void write(Entry<? extends K, ? extends V> entry) { + try { + if (log.isDebugEnabled()) + log.debug("Store put [key=" + entry.getKey() + ", val=" + entry.getValue() + ']'); + + updateCache(entry.getKey(), entry, StoreOperation.PUT); + } + catch (IgniteInterruptedCheckedException e) { + throw new CacheWriterException(U.convertExceptionNoWrap(e)); + } + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) { + for (Object key : keys) + delete(key); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void delete(Object key) { + try { + if (log.isDebugEnabled()) + log.debug("Store remove [key=" + key + ']'); + + updateCache((K)key, null, StoreOperation.RMV); + } + catch (IgniteInterruptedCheckedException e) { + throw new CacheWriterException(U.convertExceptionNoWrap(e)); + } + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheWriteBehindStore.class, this); + } + + /** + * Performs flush-consistent cache update for the given key. + * + * @param key Key for which update is performed. + * @param val New value, may be null for remove operation. + * @param operation Updated value status + * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed. + */ + private void updateCache(K key, + @Nullable Entry<? extends K, ? extends V> val, + StoreOperation operation) + throws IgniteInterruptedCheckedException { + StatefulValue<K, V> newVal = new StatefulValue<>(val, operation); + + StatefulValue<K, V> prev; + + while ((prev = writeCache.putIfAbsent(key, newVal)) != null) { + prev.writeLock().lock(); + + try { + if (prev.status() == ValueStatus.PENDING) { + // Flush process in progress, try again. + prev.waitForFlush(); + + continue; + } + else if (prev.status() == ValueStatus.FLUSHED) + // This entry was deleted from map before we acquired the lock. + continue; + else if (prev.status() == ValueStatus.RETRY) + // New value has come, old value is no longer in RETRY state, + retryEntriesCnt.decrementAndGet(); + + assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY; + + prev.update(val, operation, ValueStatus.NEW); + + break; + } + finally { + prev.writeLock().unlock(); + } + } + + // Now check the map size + if (writeCache.sizex() > cacheCriticalSize) + // Perform single store update in the same thread. + flushSingleValue(); + else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize) + wakeUp(); + } + + /** + * Flushes one upcoming value to the underlying store. Called from + * {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds + * critical size. + */ + private void flushSingleValue() { + cacheOverflowCntr.incrementAndGet(); + + try { + Map<K, StatefulValue<K, V>> batch = null; + + for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) { + StatefulValue<K, V> val = e.getValue(); + + val.writeLock().lock(); + + try { + ValueStatus status = val.status(); + + if (acquired(status)) + // Another thread is helping us, continue to the next entry. + continue; + + if (val.status() == ValueStatus.RETRY) + retryEntriesCnt.decrementAndGet(); + + assert retryEntriesCnt.get() >= 0; + + val.status(ValueStatus.PENDING); + + batch = Collections.singletonMap(e.getKey(), val); + } + finally { + val.writeLock().unlock(); + } + + if (!batch.isEmpty()) { + applyBatch(batch, false); + + cacheTotalOverflowCntr.incrementAndGet(); + + return; + } + } + } + finally { + cacheOverflowCntr.decrementAndGet(); + } + } + + /** + * Performs batch operation on underlying store. + * + * @param valMap Batch map. + * @param initSes {@code True} if need to initialize session. + */ + private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) { + assert valMap.size() <= batchSize; + + StoreOperation operation = null; + + // Construct a map for underlying store + Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size()); + + for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) { + if (operation == null) + operation = e.getValue().operation(); + + assert operation == e.getValue().operation(); + + assert e.getValue().status() == ValueStatus.PENDING; + + batch.put(e.getKey(), e.getValue().entry()); + } + + if (updateStore(operation, batch, initSes)) { + for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) { + StatefulValue<K, V> val = e.getValue(); + + val.writeLock().lock(); + + try { + val.status(ValueStatus.FLUSHED); + + StatefulValue<K, V> prev = writeCache.remove(e.getKey()); + + // Additional check to ensure consistency. + assert prev == val : "Map value for key " + e.getKey() + " was updated during flush"; + + val.signalFlushed(); + } + finally { + val.writeLock().unlock(); + } + } + } + else { + // Exception occurred, we must set RETRY status + for (StatefulValue<K, V> val : valMap.values()) { + val.writeLock().lock(); + + try { + val.status(ValueStatus.RETRY); + + retryEntriesCnt.incrementAndGet(); + + val.signalFlushed(); + } + finally { + val.writeLock().unlock(); + } + } + } + } + + /** + * Tries to update store with the given values and returns {@code true} in case of success. + * + * <p/> If any exception in underlying store is occurred, this method checks the map size. + * If map size exceeds some critical value, then it returns {@code true} and this value will + * be lost. If map size does not exceed critical value, it will return false and value will + * be retained in write cache. + * + * @param operation Status indicating operation that should be performed. + * @param vals Key-Value map. + * @param initSes {@code True} if need to initialize session. + * @return {@code true} if value may be deleted from the write cache, + * {@code false} otherwise + */ + private boolean updateStore(StoreOperation operation, + Map<K, Entry<? extends K, ? extends V>> vals, + boolean initSes) { + + if (initSes && storeMgr != null) + storeMgr.writeBehindSessionInit(); + + try { + boolean threwEx = true; + + try { + switch (operation) { + case PUT: + store.writeAll(vals.values()); + + break; + + case RMV: + store.deleteAll(vals.keySet()); + + break; + + default: + assert false : "Unexpected operation: " + operation; + } + + threwEx = false; + + return true; + } + finally { + if (initSes && storeMgr != null) + storeMgr.writeBehindSessionEnd(threwEx); + } + } + catch (Exception e) { + LT.warn(log, e, "Unable to update underlying store: " + store); + + if (writeCache.sizex() > cacheCriticalSize || stopping.get()) { + for (Map.Entry<K, Entry<? extends K, ? extends V>> entry : vals.entrySet()) { + Object val = entry.getValue() != null ? entry.getValue().getValue() : null; + + log.warning("Failed to update store (value will be lost as current buffer size is greater " + + "than 'cacheCriticalSize' or node has been stopped before store was repaired) [key=" + + entry.getKey() + ", val=" + val + ", op=" + operation + "]"); + } + + return true; + } + + return false; + } + } + + /** + * Wakes up flushing threads if map size exceeded maximum value or in case of shutdown. + */ + private void wakeUp() { + flushLock.lock(); + + try { + canFlush.signalAll(); + } + finally { + flushLock.unlock(); + } + } + + /** + * Thread that performs time-based flushing of written values to the underlying storage. + */ + private class Flusher extends GridWorker { + /** {@inheritDoc */ + protected Flusher(String gridName, String name, IgniteLogger log) { + super(gridName, name, log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + while (!stopping.get() || writeCache.sizex() > 0) { + awaitOperationsAvailable(); + + flushCache(writeCache.entrySet().iterator()); + } + } + + /** + * This method awaits until enough elements in map are available or given timeout is over. + * + * @throws InterruptedException If awaiting was interrupted. + */ + private void awaitOperationsAvailable() throws InterruptedException { + flushLock.lock(); + + try { + do { + if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 0) { + if (cacheFlushFreq > 0) + canFlush.await(cacheFlushFreq, TimeUnit.MILLISECONDS); + else + canFlush.await(); + } + } + while (writeCache.sizex() == 0 && !stopping.get()); + } + finally { + flushLock.unlock(); + } + } + + /** + * Removes values from the write cache and performs corresponding operation + * on the underlying store. + * + * @param it Iterator for write cache. + */ + private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) { + StoreOperation operation = null; + + Map<K, StatefulValue<K, V>> batch = null; + Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize); + + while (it.hasNext()) { + Map.Entry<K, StatefulValue<K, V>> e = it.next(); + + StatefulValue<K, V> val = e.getValue(); + + val.writeLock().lock(); + + try { + ValueStatus status = val.status(); + + if (acquired(status)) + // Another thread is helping us, continue to the next entry. + continue; + + if (status == ValueStatus.RETRY) + retryEntriesCnt.decrementAndGet(); + + assert retryEntriesCnt.get() >= 0; + + val.status(ValueStatus.PENDING); + + // We scan for the next operation and apply batch on operation change. Null means new batch. + if (operation == null) + operation = val.operation(); + + if (operation != val.operation()) { + // Operation is changed, so we need to perform a batch. + batch = pending; + pending = U.newLinkedHashMap(batchSize); + + operation = val.operation(); + + pending.put(e.getKey(), val); + } + else + pending.put(e.getKey(), val); + + if (pending.size() == batchSize) { + batch = pending; + pending = U.newLinkedHashMap(batchSize); + + operation = null; + } + } + finally { + val.writeLock().unlock(); + } + + if (batch != null && !batch.isEmpty()) { + applyBatch(batch, true); + batch = null; + } + } + + // Process the remainder. + if (!pending.isEmpty()) + applyBatch(pending, true); + } + } + + /** + * For test purposes only. + * + * @return Write cache for the underlying store operations. + */ + Map<K, StatefulValue<K, V>> writeCache() { + return writeCache; + } + + /** + * Enumeration that represents possible operations on the underlying store. + */ + private enum StoreOperation { + /** Put key-value pair to the underlying store. */ + PUT, + + /** Remove key from the underlying store. */ + RMV + } + + /** + * Enumeration that represents possible states of value in the map. + */ + private enum ValueStatus { + /** Value is scheduled for write or delete from the underlying cache but has not been captured by flusher. */ + NEW, + + /** Value is captured by flusher and store operation is performed at the moment. */ + PENDING, + + /** Store operation has failed and it will be re-tried at the next flush. */ + RETRY, + + /** Store operation succeeded and this value will be removed by flusher. */ + FLUSHED, + } + + /** + * Checks if given status indicates pending or complete flush operation. + * + * @param status Status to check. + * @return {@code true} if status indicates any pending or complete store update operation. + */ + private boolean acquired(ValueStatus status) { + return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED; + } + + /** + * A state-value-operation trio. + * + * @param <V> Value type. + */ + private static class StatefulValue<K, V> extends ReentrantReadWriteLock { + /** */ + private static final long serialVersionUID = 0L; + + /** Value. */ + @GridToStringInclude + private Entry<? extends K, ? extends V> val; + + /** Store operation. */ + private StoreOperation storeOperation; + + /** Value status. */ + private ValueStatus valStatus; + + /** Condition to wait for flush event */ + private Condition flushCond = writeLock().newCondition(); + + /** + * Creates a state-value pair with {@link ValueStatus#NEW} status. + * + * @param val Value. + * @param storeOperation Store operation. + */ + private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeOperation) { + assert storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV; + + this.val = val; + this.storeOperation = storeOperation; + valStatus = ValueStatus.NEW; + } + + /** + * @return Stored value. + */ + private Entry<? extends K, ? extends V> entry() { + return val; + } + + /** + * @return Store operation. + */ + private StoreOperation operation() { + return storeOperation; + } + + /** + * @return Value status + */ + private ValueStatus status() { + return valStatus; + } + + /** + * Updates value status. + * + * @param valStatus Value status. + */ + private void status(ValueStatus valStatus) { + this.valStatus = valStatus; + } + + /** + * Updates both value and value status. + * + * @param val Value. + * @param storeOperation Store operation. + * @param valStatus Value status. + */ + private void update(@Nullable Entry<? extends K, ? extends V> val, + StoreOperation storeOperation, + ValueStatus valStatus) { + this.val = val; + this.storeOperation = storeOperation; + this.valStatus = valStatus; + } + + /** + * Awaits a signal on flush condition + * + * @throws IgniteInterruptedCheckedException If thread was interrupted. + */ + private void waitForFlush() throws IgniteInterruptedCheckedException { + U.await(flushCond); + } + + /** + * Signals flush condition. + */ + @SuppressWarnings({"SignalWithoutCorrespondingAwait"}) + private void signalFlushed() { + flushCond.signalAll(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof StatefulValue)) + return false; + + StatefulValue other = (StatefulValue)o; + + return F.eq(val, other.val) && F.eq(valStatus, other.valStatus); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val != null ? val.hashCode() : 0; + + res = 31 * res + valStatus.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StatefulValue.class, this); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index acd3202..1b66b4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.*; @@ -444,11 +445,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * * @return Store manager. */ - protected GridCacheStoreManager store() { + protected CacheStoreManager store() { if (!activeCacheIds().isEmpty()) { int cacheId = F.first(activeCacheIds()); - GridCacheStoreManager store = cctx.cacheContext(cacheId).store(); + CacheStoreManager store = cctx.cacheContext(cacheId).store(); return store.configured() ? store : null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index f2407ce..10146a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.dr.*; import org.apache.ignite.internal.transactions.*; @@ -370,7 +371,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } return new GridFinishedFuture<>( - cacheCtx.store().loadAllFromStore(this, keys, c)); + cacheCtx.store().loadAll(this, keys, c)); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); @@ -387,7 +388,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return false; } - return cacheCtx.store().loadAllFromStore(IgniteTxLocalAdapter.this, keys, c); + return cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c); } }, true); @@ -492,17 +493,17 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ @SuppressWarnings({"CatchGenericClass"}) protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException { - GridCacheStoreManager store = store(); + CacheStoreManager store = store(); - if (store != null && store.writeThrough() && storeEnabled() && - (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) { + if (store != null && store.isWriteThrough() && storeEnabled() && + (!internal() || groupLock()) && (near() || store.isWriteToStoreFromDht())) { try { if (writeEntries != null) { Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null; List<Object> rmvCol = null; - GridCacheStoreManager writeStore = null; + CacheStoreManager writeStore = null; - boolean skipNear = near() && store.writeToStoreFromDht(); + boolean skipNear = near() && store.isWriteToStoreFromDht(); for (IgniteTxEntry e : writeEntries) { if (skipNear && e.cached().isNear()) @@ -527,7 +528,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (rmvCol != null && !rmvCol.isEmpty()) { assert writeStore != null; - writeStore.removeAllFromStore(this, rmvCol); + writeStore.removeAll(this, rmvCol); // Reset. rmvCol.clear(); @@ -537,7 +538,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // Batch-process puts if cache ID has changed. if (writeStore != null && writeStore != cacheCtx.store() && putMap != null && !putMap.isEmpty()) { - writeStore.putAllToStore(this, putMap); + writeStore.putAll(this, putMap); // Reset. putMap.clear(); @@ -568,7 +569,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (putMap != null && !putMap.isEmpty()) { assert writeStore != null; - writeStore.putAllToStore(this, putMap); + writeStore.putAll(this, putMap); // Reset. putMap.clear(); @@ -577,7 +578,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (writeStore != null && writeStore != cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) { - writeStore.removeAllFromStore(this, rmvCol); + writeStore.removeAll(this, rmvCol); // Reset. rmvCol.clear(); @@ -609,7 +610,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert writeStore != null; // Batch put at the end of transaction. - writeStore.putAllToStore(this, putMap); + writeStore.putAll(this, putMap); } if (rmvCol != null && !rmvCol.isEmpty()) { @@ -617,12 +618,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert writeStore != null; // Batch remove at the end of transaction. - writeStore.removeAllFromStore(this, rmvCol); + writeStore.removeAll(this, rmvCol); } } // Commit while locks are held. - store.txEnd(this, true); + store.sessionEnd(this, true); } catch (IgniteCheckedException ex) { commitError(ex); @@ -981,11 +982,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else { - GridCacheStoreManager store = store(); + CacheStoreManager store = store(); if (store != null && (!internal() || groupLock())) { try { - store.txEnd(this, true); + store.sessionEnd(this, true); } catch (IgniteCheckedException e) { commitError(e); @@ -1086,11 +1087,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().rollbackTx(this); - GridCacheStoreManager store = store(); + CacheStoreManager store = store(); - if (store != null && (near() || store.writeToStoreFromDht())) { + if (store != null && (near() || store.isWriteToStoreFromDht())) { if (!internal() || groupLock()) - store.txEnd(this, false); + store.sessionEnd(this, false); } } catch (Error | IgniteCheckedException | RuntimeException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index 42586d2..6a8117f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -156,10 +156,4 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { * with {@link IgniteImmutable} annotation. */ public boolean immutable(Object obj); - - /** - * @param cacheName Cache name. - * @return {@code True} if portable format should be preserved when passing values to cache store. - */ - public boolean keepPortableInStore(@Nullable String cacheName); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index f65b7bd..6e46757 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -234,11 +234,6 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ - @Override public boolean keepPortableInStore(@Nullable String cacheName) { - return false; - } - - /** {@inheritDoc} */ @Override public void onCacheProcessorStarted() { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java index 69ca1ae..580ff49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java @@ -22,6 +22,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.dr.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.plugin.*; import java.util.*; @@ -76,10 +77,13 @@ public class CachePluginManager extends GridCacheManagerAdapter { /** * Creates optional component. * + * @param ctx Kernal context. + * @param cfg Cache configuration. * @param cls Component class. * @return Created component. */ - public <T> T createComponent(Class<T> cls) { + @SuppressWarnings("unchecked") + public <T> T createComponent(GridKernalContext ctx, CacheConfiguration cfg, Class<T> cls) { for (CachePluginProvider provider : providers) { T res = (T)provider.createComponent(cls); @@ -91,6 +95,8 @@ public class CachePluginManager extends GridCacheManagerAdapter { return (T)new GridOsCacheDrManager(); else if (cls.equals(CacheConflictResolutionManager.class)) return (T)new CacheOsConflictResolutionManager<>(); + else if (cls.equals(CacheStoreManager.class)) + return (T)new CacheOsStoreManager(ctx, cfg); throw new IgniteException("Unsupported component type: " + cls); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 55c1f9d..d7213a4 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -389,10 +389,10 @@ org.apache.ignite.internal.processors.cache.GridCacheProcessor$LocalAffinityFunc org.apache.ignite.internal.processors.cache.GridCacheProjectionImpl org.apache.ignite.internal.processors.cache.GridCacheProxyImpl org.apache.ignite.internal.processors.cache.GridCacheReturn -org.apache.ignite.internal.processors.cache.GridCacheStoreManager$1 -org.apache.ignite.internal.processors.cache.GridCacheStoreManager$2 -org.apache.ignite.internal.processors.cache.GridCacheStoreManager$3 -org.apache.ignite.internal.processors.cache.GridCacheStoreManager$4 +org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$1 +org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$2 +org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$3 +org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter$4 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$10 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$12 org.apache.ignite.internal.processors.cache.GridCacheSwapManager$14 @@ -433,9 +433,9 @@ org.apache.ignite.internal.processors.cache.GridCacheUtils$8 org.apache.ignite.internal.processors.cache.GridCacheUtils$9 org.apache.ignite.internal.processors.cache.GridCacheValueCollection org.apache.ignite.internal.processors.cache.GridCacheValueCollection$1 -org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$StatefulValue -org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$StoreOperation -org.apache.ignite.internal.processors.cache.GridCacheWriteBehindStore$ValueStatus +org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$StatefulValue +org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$StoreOperation +org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$ValueStatus org.apache.ignite.internal.processors.cache.IgniteCacheProxy org.apache.ignite.internal.processors.cache.IgniteCacheProxy$1 org.apache.ignite.internal.processors.cache.IgniteCacheProxy$2 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java index b9f9602..c4ba385 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java @@ -26,14 +26,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.common.*; import org.apache.ignite.transactions.*; -import javax.cache.configuration.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; /** * Test that in {@link CacheMode#PARTITIONED} mode cache writes values only to the near cache store. <p/> This check - * is needed because in current implementation if {@link GridCacheWriteBehindStore} assumes that and user store is + * is needed because in current implementation if {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} assumes that and user store is * wrapped only in near cache (see {@link GridCacheProcessor} init logic). */ @SuppressWarnings({"unchecked"}) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java deleted file mode 100644 index 883a216..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractSelfTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Harness for {@link GridCacheWriteBehindStore} tests. - */ -public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridCommonAbstractTest { - /** Write cache size. */ - public static final int CACHE_SIZE = 1024; - - /** Value dump interval. */ - public static final int FLUSH_FREQUENCY = 1000; - - /** Underlying store. */ - protected GridCacheTestStore delegate = new GridCacheTestStore(); - - /** Tested store. */ - protected GridCacheWriteBehindStore<Integer, String> store; - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - delegate = null; - store = null; - - super.afterTestsStopped(); - } - - /** - * Initializes store. - * - * @param flushThreadCnt Count of flush threads - * @throws Exception If failed. - */ - protected void initStore(int flushThreadCnt) throws Exception { - store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate); - - store.setFlushFrequency(FLUSH_FREQUENCY); - - store.setFlushSize(CACHE_SIZE); - - store.setFlushThreadCount(flushThreadCnt); - - delegate.reset(); - - store.start(); - } - - /** - * Shutdowns store. - * - * @throws Exception If failed. - */ - protected void shutdownStore() throws Exception { - store.stop(); - - assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty()); - } - - /** - * Performs multiple put, get and remove operations in several threads on a store. After - * all threads finished their operations, returns the total set of keys that should be - * in underlying store. - * - * @param threadCnt Count of threads that should update keys. - * @param keysPerThread Count of unique keys assigned to a thread. - * @return Set of keys that was totally put in store. - * @throws Exception If failed. - */ - protected Set<Integer> runPutGetRemoveMultithreaded(int threadCnt, final int keysPerThread) throws Exception { - final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>(); - - final AtomicBoolean running = new AtomicBoolean(true); - - final AtomicInteger cntr = new AtomicInteger(); - - final AtomicInteger operations = new AtomicInteger(); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @SuppressWarnings({"NullableProblems"}) - @Override public void run() { - // Initialize key set for this thread. - Set<Integer> set = new HashSet<>(); - - Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set); - - if (old != null) - set = old; - - List<Integer> original = new ArrayList<>(); - - Random rnd = new Random(); - - for (int i = 0; i < keysPerThread; i++) - original.add(cntr.getAndIncrement()); - - try { - while (running.get()) { - int op = rnd.nextInt(3); - int idx = rnd.nextInt(keysPerThread); - - int key = original.get(idx); - - switch (op) { - case 0: - store.write(new CacheEntryImpl<>(key, "val" + key)); - set.add(key); - - operations.incrementAndGet(); - - break; - - case 1: - store.delete(key); - set.remove(key); - - operations.incrementAndGet(); - - break; - - case 2: - default: - store.write(new CacheEntryImpl<>(key, "broken")); - - String val = store.load(key); - - assertEquals("Invalid intermediate value: " + val, "broken", val); - - store.write(new CacheEntryImpl<>(key, "val" + key)); - - set.add(key); - - // 2 put operations performed here. - operations.incrementAndGet(); - operations.incrementAndGet(); - operations.incrementAndGet(); - - break; - } - } - } - catch (Exception e) { - error("Unexpected exception in put thread", e); - - assert false; - } - } - }, threadCnt, "put"); - - U.sleep(10000); - - running.set(false); - - fut.get(); - - log().info(">>> " + operations + " operations performed totally"); - - Set<Integer> total = new HashSet<>(); - - for (Set<Integer> threadVals : perThread.values()) { - total.addAll(threadVals); - } - - return total; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java deleted file mode 100644 index 305b8bb..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; -import org.apache.ignite.transactions.*; -import org.jetbrains.annotations.*; - -import javax.cache.configuration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.transactions.TransactionConcurrency.*; -import static org.apache.ignite.transactions.TransactionIsolation.*; - -/** - * Basic store test. - */ -public abstract class GridCacheWriteBehindStoreAbstractTest extends GridCommonAbstractTest { - /** Flush frequency. */ - private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000; - - /** Cache store. */ - private static final GridCacheTestStore store = new GridCacheTestStore(); - - /** - * - */ - protected GridCacheWriteBehindStoreAbstractTest() { - super(true /*start grid. */); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - store.resetTimestamp(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - IgniteCache<?, ?> cache = jcache(); - - if (cache != null) - cache.clear(); - - store.reset(); - } - - /** @return Caching mode. */ - protected abstract CacheMode cacheMode(); - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected final IgniteConfiguration getConfiguration() throws Exception { - IgniteConfiguration c = super.getConfiguration(); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - c.setDiscoverySpi(disco); - - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(cacheMode()); - cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cc.setSwapEnabled(false); - cc.setAtomicityMode(TRANSACTIONAL); - - cc.setCacheStoreFactory(singletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - cc.setLoadPreviousValue(true); - - cc.setWriteBehindEnabled(true); - cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY); - - c.setCacheConfiguration(cc); - - return c; - } - - /** @throws Exception If test fails. */ - public void testWriteThrough() throws Exception { - IgniteCache<Integer, String> cache = jcache(); - - Map<Integer, String> map = store.getMap(); - - assert map.isEmpty(); - - Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ); - - try { - for (int i = 1; i <= 10; i++) { - cache.put(i, Integer.toString(i)); - - checkLastMethod(null); - } - - tx.commit(); - } - finally { - tx.close(); - } - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - checkLastMethod("putAll"); - - assert cache.size() == 10; - - for (int i = 1; i <= 10; i++) { - String val = map.get(i); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - store.resetLastMethod(); - - tx = grid().transactions().txStart(); - - try { - for (int i = 1; i <= 10; i++) { - String val = cache.getAndRemove(i); - - checkLastMethod(null); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - tx.commit(); - } - finally { - tx.close(); - } - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - checkLastMethod("removeAll"); - - assert map.isEmpty(); - } - - /** @throws Exception If test failed. */ - public void testReadThrough() throws Exception { - IgniteCache<Integer, String> cache = jcache(); - - Map<Integer, String> map = store.getMap(); - - assert map.isEmpty(); - - try (Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { - for (int i = 1; i <= 10; i++) - cache.put(i, Integer.toString(i)); - - checkLastMethod(null); - - tx.commit(); - } - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - checkLastMethod("putAll"); - - for (int i = 1; i <= 10; i++) { - String val = map.get(i); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - cache.clear(); - - assert cache.localSize() == 0; - assert cache.localSize() == 0; - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - assert map.size() == 10; - - for (int i = 1; i <= 10; i++) { - // Read through. - String val = cache.get(i); - - checkLastMethod("load"); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - assert cache.size() == 10; - - cache.clear(); - - assert cache.localSize() == 0; - assert cache.localSize() == 0; - - assert map.size() == 10; - - Set<Integer> keys = new HashSet<>(); - - for (int i = 1; i <= 10; i++) - keys.add(i); - - // Read through. - Map<Integer, String> vals = cache.getAll(keys); - - checkLastMethod("loadAll"); - - assert vals != null; - assert vals.size() == 10; - - for (int i = 1; i <= 10; i++) { - String val = vals.get(i); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - // Write through. - cache.removeAll(keys); - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - checkLastMethod("removeAll"); - - assert cache.localSize() == 0; - assert cache.localSize() == 0; - - assert map.isEmpty(); - } - - /** @throws Exception If failed. */ - public void testMultithreaded() throws Exception { - final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>(); - - final AtomicBoolean running = new AtomicBoolean(true); - - final IgniteCache<Integer, String> cache = jcache(); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @SuppressWarnings({"NullableProblems"}) - @Override public void run() { - // Initialize key set for this thread. - Set<Integer> set = new HashSet<>(); - - Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set); - - if (old != null) - set = old; - - Random rnd = new Random(); - - int keyCnt = 20000; - - while (running.get()) { - int op = rnd.nextInt(2); - int key = rnd.nextInt(keyCnt); - - switch (op) { - case 0: - cache.put(key, "val" + key); - set.add(key); - - break; - - case 1: - default: - cache.remove(key); - set.remove(key); - - break; - } - } - } - }, 10, "put"); - - U.sleep(10000); - - running.set(false); - - fut.get(); - - U.sleep(5 * WRITE_FROM_BEHIND_FLUSH_FREQUENCY); - - Map<Integer, String> stored = store.getMap(); - - for (Map.Entry<Integer, String> entry : stored.entrySet()) { - int key = entry.getKey(); - - assertEquals("Invalid value for key " + key, "val" + key, entry.getValue()); - - boolean found = false; - - for (Set<Integer> threadPuts : perThread.values()) { - if (threadPuts.contains(key)) { - found = true; - - break; - } - } - - assert found : "No threads found that put key " + key; - } - } - - /** @param mtd Expected last method value. */ - private void checkLastMethod(@Nullable String mtd) { - String lastMtd = store.getLastMethod(); - - if (mtd == null) - assert lastMtd == null : "Last method must be null: " + lastMtd; - else { - assert lastMtd != null : "Last method must be not null"; - assert lastMtd.equals(mtd) : "Last method does not match [expected=" + mtd + ", lastMtd=" + lastMtd + ']'; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java deleted file mode 100644 index 6c050ca..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.cache.*; - -/** - * Tests {@link GridCacheWriteBehindStore} in grid configuration. - */ -public class GridCacheWriteBehindStoreLocalTest extends GridCacheWriteBehindStoreAbstractTest { - /** {@inheritDoc} */ - @Override protected CacheMode cacheMode() { - return CacheMode.LOCAL; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java deleted file mode 100644 index 9607784..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Multithreaded tests for {@link GridCacheWriteBehindStore}. - */ -public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { - /** - * This test performs complex set of operations on store from multiple threads. - * - * @throws Exception If failed. - */ - public void testPutGetRemove() throws Exception { - initStore(2); - - Set<Integer> exp; - - try { - exp = runPutGetRemoveMultithreaded(10, 10); - } - finally { - shutdownStore(); - } - - Map<Integer, String> map = delegate.getMap(); - - Collection<Integer> extra = new HashSet<>(map.keySet()); - - extra.removeAll(exp); - - assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); - - Collection<Integer> missing = new HashSet<>(exp); - - missing.removeAll(map.keySet()); - - assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); - - for (Integer key : exp) - assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); - } - - /** - * Tests that cache would keep values if underlying store fails. - * - * @throws Exception If failed. - */ - public void testStoreFailure() throws Exception { - delegate.setShouldFail(true); - - initStore(2); - - Set<Integer> exp; - - try { - exp = runPutGetRemoveMultithreaded(10, 10); - - U.sleep(FLUSH_FREQUENCY); - - info(">>> There are " + store.getWriteBehindErrorRetryCount() + " entries in RETRY state"); - - delegate.setShouldFail(false); - - // Despite that we set shouldFail flag to false, flush thread may just have caught an exception. - // If we move store to the stopping state right away, this value will be lost. That's why this sleep - // is inserted here to let all exception handlers in write-behind store exit. - U.sleep(1000); - } - finally { - shutdownStore(); - } - - Map<Integer, String> map = delegate.getMap(); - - Collection<Integer> extra = new HashSet<>(map.keySet()); - - extra.removeAll(exp); - - assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); - - Collection<Integer> missing = new HashSet<>(exp); - - missing.removeAll(map.keySet()); - - assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); - - for (Integer key : exp) - assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); - } - - /** - * Tests store consistency in case of high put rate, when flush is performed from the same thread - * as put or remove operation. - * - * @throws Exception If failed. - */ - public void testFlushFromTheSameThread() throws Exception { - // 50 milliseconds should be enough. - delegate.setOperationDelay(50); - - initStore(2); - - Set<Integer> exp; - - int start = store.getWriteBehindTotalCriticalOverflowCount(); - - try { - //We will have in total 5 * CACHE_SIZE keys that should be enough to grow map size to critical value. - exp = runPutGetRemoveMultithreaded(5, CACHE_SIZE); - } - finally { - log.info(">>> Done inserting, shutting down the store"); - - shutdownStore(); - } - - // Restore delay. - delegate.setOperationDelay(0); - - Map<Integer, String> map = delegate.getMap(); - - int end = store.getWriteBehindTotalCriticalOverflowCount(); - - log.info(">>> There are " + exp.size() + " keys in store, " + (end - start) + " overflows detected"); - - assertTrue("No cache overflows detected (a bug or too few keys or too few delay?)", end > start); - - Collection<Integer> extra = new HashSet<>(map.keySet()); - - extra.removeAll(exp); - - assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); - - Collection<Integer> missing = new HashSet<>(exp); - - missing.removeAll(map.keySet()); - - assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); - - for (Integer key : exp) - assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java deleted file mode 100644 index 8fb4f68..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; -import org.apache.ignite.transactions.*; - -import java.util.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.transactions.TransactionConcurrency.*; -import static org.apache.ignite.transactions.TransactionIsolation.*; - -/** - * Tests write-behind store with near and dht commit option. - */ -public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends GridCommonAbstractTest { - /** Grids to start. */ - private static final int GRID_CNT = 5; - - /** Ip finder. */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Flush frequency. */ - public static final int WRITE_BEHIND_FLUSH_FREQ = 1000; - - /** Stores per grid. */ - private GridCacheTestStore[] stores = new GridCacheTestStore[GRID_CNT]; - - /** Start grid counter. */ - private int idx; - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - c.setDiscoverySpi(disco); - - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(CacheMode.PARTITIONED); - cc.setWriteBehindEnabled(true); - cc.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQ); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setNearConfiguration(new NearCacheConfiguration()); - - CacheStore store = stores[idx] = new GridCacheTestStore(); - - cc.setCacheStoreFactory(singletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - cc.setLoadPreviousValue(true); - - c.setCacheConfiguration(cc); - - idx++; - - return c; - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stores = null; - - super.afterTestsStopped(); - } - - /** - * @throws Exception If failed. - */ - private void prepare() throws Exception { - idx = 0; - - startGrids(GRID_CNT); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testSingleWritesOnDhtNode() throws Exception { - checkSingleWrites(); - } - - /** - * @throws Exception If failed. - */ - public void testBatchWritesOnDhtNode() throws Exception { - checkBatchWrites(); - } - - /** - * @throws Exception If failed. - */ - public void testTxWritesOnDhtNode() throws Exception { - checkTxWrites(); - } - - /** - * @throws Exception If failed. - */ - private void checkSingleWrites() throws Exception { - prepare(); - - IgniteCache<Integer, String> cache = grid(0).cache(null); - - for (int i = 0; i < 100; i++) - cache.put(i, String.valueOf(i)); - - checkWrites(); - } - - /** - * @throws Exception If failed. - */ - private void checkBatchWrites() throws Exception { - prepare(); - - Map<Integer, String> map = new HashMap<>(); - - for (int i = 0; i < 100; i++) - map.put(i, String.valueOf(i)); - - grid(0).cache(null).putAll(map); - - checkWrites(); - } - - /** - * @throws Exception If failed. - */ - private void checkTxWrites() throws Exception { - prepare(); - - IgniteCache<Object, Object> cache = grid(0).cache(null); - - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - for (int i = 0; i < 100; i++) - cache.put(i, String.valueOf(i)); - - tx.commit(); - } - - checkWrites(); - } - - /** - * @throws IgniteInterruptedCheckedException If sleep was interrupted. - */ - private void checkWrites() throws IgniteInterruptedCheckedException { - U.sleep(WRITE_BEHIND_FLUSH_FREQ * 2); - - Collection<Integer> allKeys = new ArrayList<>(100); - - for (int i = 0; i < GRID_CNT; i++) { - Map<Integer,String> map = stores[i].getMap(); - - assertFalse("Missing writes for node: " + i, map.isEmpty()); - - allKeys.addAll(map.keySet()); - - // Check there is no intersection. - for (int j = 0; j < GRID_CNT; j++) { - if (i == j) - continue; - - Collection<Integer> intersection = new HashSet<>(stores[j].getMap().keySet()); - - intersection.retainAll(map.keySet()); - - assertTrue(intersection.isEmpty()); - } - } - - assertEquals(100, allKeys.size()); - - for (int i = 0; i < 100; i++) - assertTrue(allKeys.contains(i)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java deleted file mode 100644 index f9e454f..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.cache.*; - -/** - * Tests {@link GridCacheWriteBehindStore} in partitioned configuration. - */ -public class GridCacheWriteBehindStorePartitionedTest extends GridCacheWriteBehindStoreAbstractTest { - /** {@inheritDoc} */ - @Override protected CacheMode cacheMode() { - return CacheMode.PARTITIONED; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java deleted file mode 100644 index c809f90..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.cache.*; - -/** - * Tests {@link GridCacheWriteBehindStore} in grid configuration. - */ -public class GridCacheWriteBehindStoreReplicatedTest extends GridCacheWriteBehindStoreAbstractTest { - /** {@inheritDoc} */ - @Override protected CacheMode cacheMode() { - return CacheMode.REPLICATED; - } -}