http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java deleted file mode 100644 index c26c34e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStore.java +++ /dev/null @@ -1,1015 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.lifecycle.*; -import org.apache.ignite.thread.*; -import org.jetbrains.annotations.*; -import org.jsr166.*; - -import javax.cache.integration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -import static javax.cache.Cache.*; - -/** - * Internal wrapper for a {@link CacheStore} that enables write-behind logic. - * <p/> - * The general purpose of this approach is to reduce cache store load under high - * store update rate. The idea is to cache all write and remove operations in a pending - * map and delegate these changes to the underlying store either after timeout or - * if size of a pending map exceeded some pre-configured value. Another performance gain - * is achieved due to combining a group of similar operations to a single batch update. - * <p/> - * The essential flush size for the write-behind cache should be at least the estimated - * count of simultaneously written keys. In case of significantly smaller value there would - * be triggered a lot of flush events that will result in a high cache store load. - * <p/> - * Since write operations to the cache store are deferred, transaction support is lost; no - * transaction objects are passed to the underlying store. - */ -public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware { - /** Default write cache initial capacity. */ - public static final int DFLT_INITIAL_CAPACITY = 1024; - - /** Overflow ratio for critical cache size calculation. */ - public static final float CACHE_OVERFLOW_RATIO = 1.5f; - - /** Default concurrency level of write cache. */ - public static final int DFLT_CONCUR_LVL = 64; - - /** Write cache initial capacity. */ - private int initCap = DFLT_INITIAL_CAPACITY; - - /** Concurrency level for write cache access. */ - private int concurLvl = DFLT_CONCUR_LVL; - - /** When cache size exceeds this value eldest entry will be stored to the underlying store. */ - private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE; - - /** Critical cache size. If cache size exceeds this value, data flush performed synchronously. */ - private int cacheCriticalSize; - - /** Count of worker threads performing underlying store updates. */ - private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT; - - /** Cache flush frequency. All pending operations will be performed in not less then this value ms. */ - private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY; - - /** Maximum batch size for put and remove operations */ - private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE; - - /** Grid name. */ - private String gridName; - - /** Cache name. */ - private String cacheName; - - /** Underlying store. */ - private CacheStore<K, V> store; - - /** Write cache. */ - private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache; - - /** Flusher threads. */ - private GridWorker[] flushThreads; - - /** Atomic flag indicating store shutdown. */ - private AtomicBoolean stopping = new AtomicBoolean(true); - - /** Flush lock. */ - private Lock flushLock = new ReentrantLock(); - - /** Condition to determine records available for flush. */ - private Condition canFlush = flushLock.newCondition(); - - /** Variable for counting total cache overflows. */ - private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger(); - - /** Variable contains current number of overflow events. */ - private AtomicInteger cacheOverflowCntr = new AtomicInteger(); - - /** Variable for counting key-value pairs that are in {@link ValueStatus#RETRY} state. */ - private AtomicInteger retryEntriesCnt = new AtomicInteger(); - - /** Log. */ - private IgniteLogger log; - - /** Store manager. */ - private GridCacheStoreManager storeMgr; - - /** - * Creates a write-behind cache store for the given store. - * - * @param storeMgr Store manager. - * @param gridName Grid name. - * @param cacheName Cache name. - * @param log Grid logger. - * @param store {@code GridCacheStore} that need to be wrapped. - */ - public GridCacheWriteBehindStore( - GridCacheStoreManager storeMgr, - String gridName, - String cacheName, - IgniteLogger log, - CacheStore<K, V> store) { - this.storeMgr = storeMgr; - this.gridName = gridName; - this.cacheName = cacheName; - this.log = log; - this.store = store; - } - - /** - * Sets initial capacity for the write cache. - * - * @param initCap Initial capacity. - */ - public void setInitialCapacity(int initCap) { - this.initCap = initCap; - } - - /** - * Sets concurrency level for the write cache. Concurrency level is expected count of concurrent threads - * attempting to update cache. - * - * @param concurLvl Concurrency level. - */ - public void setConcurrencyLevel(int concurLvl) { - this.concurLvl = concurLvl; - } - - /** - * Sets the maximum size of the write cache. When the count of unique keys in write cache exceeds this value, - * the eldest entry in the cache is immediately scheduled for write to the underlying store. - * - * @param cacheMaxSize Max cache size. - */ - public void setFlushSize(int cacheMaxSize) { - this.cacheMaxSize = cacheMaxSize; - } - - /** - * Gets the maximum size of the write-behind buffer. When the count of unique keys - * in write buffer exceeds this value, the buffer is scheduled for write to the underlying store. - * <p/> - * If this value is {@code 0}, then flush is performed only on time-elapsing basis. However, - * when this value is {@code 0}, the cache critical size is set to - * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE} - * - * @return Buffer size that triggers flush procedure. - */ - public int getWriteBehindFlushSize() { - return cacheMaxSize; - } - - /** - * Sets the number of threads that will perform store update operations. - * - * @param flushThreadCnt Count of worker threads. - */ - public void setFlushThreadCount(int flushThreadCnt) { - this.flushThreadCnt = flushThreadCnt; - } - - /** - * Gets the number of flush threads that will perform store update operations. - * - * @return Count of worker threads. - */ - public int getWriteBehindFlushThreadCount() { - return flushThreadCnt; - } - - /** - * Sets the cache flush frequency. All pending operations on the underlying store will be performed - * within time interval not less then this value. - * - * @param cacheFlushFreq Time interval value in milliseconds. - */ - public void setFlushFrequency(long cacheFlushFreq) { - this.cacheFlushFreq = cacheFlushFreq; - } - - /** - * Gets the cache flush frequency. All pending operations on the underlying store will be performed - * within time interval not less then this value. - * <p/> - * If this value is {@code 0}, then flush is performed only when buffer size exceeds flush size. - * - * @return Flush frequency in milliseconds. - */ - public long getWriteBehindFlushFrequency() { - return cacheFlushFreq; - } - - /** - * Sets the maximum count of similar operations that can be grouped to a single batch. - * - * @param batchSize Maximum count of batch. - */ - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - /** - * Gets the maximum count of similar (put or remove) operations that can be grouped to a single batch. - * - * @return Maximum size of batch. - */ - public int getWriteBehindStoreBatchSize() { - return batchSize; - } - - /** - * Gets count of entries that were processed by the write-behind store and have not been - * flushed to the underlying store yet. - * - * @return Total count of entries in cache store internal buffer. - */ - public int getWriteBehindBufferSize() { - return writeCache.sizex(); - } - - /** - * @return Underlying store. - */ - public CacheStore<K, V> store() { - return store; - } - - /** - * Performs all the initialization logic for write-behind cache store. - * This class must not be used until this method returns. - */ - @Override public void start() { - assert cacheFlushFreq != 0 || cacheMaxSize != 0; - - if (stopping.compareAndSet(true, false)) { - if (log.isDebugEnabled()) - log.debug("Starting write-behind store for cache '" + cacheName + '\''); - - cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO); - - if (cacheCriticalSize == 0) - cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE; - - flushThreads = new GridWorker[flushThreadCnt]; - - writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl); - - for (int i = 0; i < flushThreads.length; i++) { - flushThreads[i] = new Flusher(gridName, "flusher-" + i, log); - - new IgniteThread(flushThreads[i]).start(); - } - } - } - - /** - * Gets count of write buffer overflow events since initialization. Each overflow event causes - * the ongoing flush operation to be performed synchronously. - * - * @return Count of cache overflow events since start. - */ - public int getWriteBehindTotalCriticalOverflowCount() { - return cacheTotalOverflowCntr.get(); - } - - /** - * Gets count of write buffer overflow events in progress at the moment. Each overflow event causes - * the ongoing flush operation to be performed synchronously. - * - * @return Count of cache overflow events since start. - */ - public int getWriteBehindCriticalOverflowCount() { - return cacheOverflowCntr.get(); - } - - /** - * Gets count of cache entries that are in a store-retry state. An entry is assigned a store-retry state - * when underlying store failed due some reason and cache has enough space to retain this entry till - * the next try. - * - * @return Count of entries in store-retry state. - */ - public int getWriteBehindErrorRetryCount() { - return retryEntriesCnt.get(); - } - - /** - * Performs shutdown logic for store. No put, get and remove requests will be processed after - * this method is called. - */ - @Override public void stop() { - if (stopping.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Stopping write-behind store for cache '" + cacheName + '\''); - - wakeUp(); - - boolean graceful = true; - - for (GridWorker worker : flushThreads) - graceful &= U.join(worker, log); - - if (!graceful) - log.warning("Shutdown was aborted"); - } - } - - /** - * Forces all entries collected to be flushed to the underlying store. - * @throws IgniteCheckedException If failed. - */ - public void forceFlush() throws IgniteCheckedException { - wakeUp(); - } - - /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) { - store.loadCache(clo, args); - } - - /** {@inheritDoc} */ - @Override public Map<K, V> loadAll(Iterable<? extends K> keys) { - if (log.isDebugEnabled()) - log.debug("Store load all [keys=" + keys + ']'); - - Map<K, V> loaded = new HashMap<>(); - - Collection<K> remaining = new LinkedList<>(); - - for (K key : keys) { - StatefulValue<K, V> val = writeCache.get(key); - - if (val != null) { - val.readLock().lock(); - - try { - if (val.operation() == StoreOperation.PUT) - loaded.put(key, val.entry().getValue()); - else - assert val.operation() == StoreOperation.RMV : val.operation(); - } - finally { - val.readLock().unlock(); - } - } - else - remaining.add(key); - } - - // For items that were not found in queue. - if (!remaining.isEmpty()) { - Map<K, V> loaded0 = store.loadAll(remaining); - - if (loaded0 != null) - loaded.putAll(loaded0); - } - - return loaded; - } - - /** {@inheritDoc} */ - @Override public V load(K key) { - if (log.isDebugEnabled()) - log.debug("Store load [key=" + key + ']'); - - StatefulValue<K, V> val = writeCache.get(key); - - if (val != null) { - val.readLock().lock(); - - try { - switch (val.operation()) { - case PUT: - return val.entry().getValue(); - - case RMV: - return null; - - default: - assert false : "Unexpected operation: " + val.status(); - } - } - finally { - val.readLock().unlock(); - } - } - - return store.load(key); - } - - /** {@inheritDoc} */ - @Override public void writeAll(Collection<Entry<? extends K, ? extends V>> entries) { - for (Entry<? extends K, ? extends V> e : entries) - write(e); - } - - /** {@inheritDoc} */ - @Override public void write(Entry<? extends K, ? extends V> entry) { - try { - if (log.isDebugEnabled()) - log.debug("Store put [key=" + entry.getKey() + ", val=" + entry.getValue() + ']'); - - updateCache(entry.getKey(), entry, StoreOperation.PUT); - } - catch (IgniteInterruptedCheckedException e) { - throw new CacheWriterException(U.convertExceptionNoWrap(e)); - } - } - - /** {@inheritDoc} */ - @Override public void deleteAll(Collection<?> keys) { - for (Object key : keys) - delete(key); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void delete(Object key) { - try { - if (log.isDebugEnabled()) - log.debug("Store remove [key=" + key + ']'); - - updateCache((K)key, null, StoreOperation.RMV); - } - catch (IgniteInterruptedCheckedException e) { - throw new CacheWriterException(U.convertExceptionNoWrap(e)); - } - } - - /** {@inheritDoc} */ - @Override public void sessionEnd(boolean commit) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheWriteBehindStore.class, this); - } - - /** - * Performs flush-consistent cache update for the given key. - * - * @param key Key for which update is performed. - * @param val New value, may be null for remove operation. - * @param operation Updated value status - * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed. - */ - private void updateCache(K key, - @Nullable Entry<? extends K, ? extends V> val, - StoreOperation operation) - throws IgniteInterruptedCheckedException { - StatefulValue<K, V> newVal = new StatefulValue<>(val, operation); - - StatefulValue<K, V> prev; - - while ((prev = writeCache.putIfAbsent(key, newVal)) != null) { - prev.writeLock().lock(); - - try { - if (prev.status() == ValueStatus.PENDING) { - // Flush process in progress, try again. - prev.waitForFlush(); - - continue; - } - else if (prev.status() == ValueStatus.FLUSHED) - // This entry was deleted from map before we acquired the lock. - continue; - else if (prev.status() == ValueStatus.RETRY) - // New value has come, old value is no longer in RETRY state, - retryEntriesCnt.decrementAndGet(); - - assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY; - - prev.update(val, operation, ValueStatus.NEW); - - break; - } - finally { - prev.writeLock().unlock(); - } - } - - // Now check the map size - if (writeCache.sizex() > cacheCriticalSize) - // Perform single store update in the same thread. - flushSingleValue(); - else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize) - wakeUp(); - } - - /** - * Flushes one upcoming value to the underlying store. Called from - * {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds - * critical size. - */ - private void flushSingleValue() { - cacheOverflowCntr.incrementAndGet(); - - try { - Map<K, StatefulValue<K, V>> batch = null; - - for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) { - StatefulValue<K, V> val = e.getValue(); - - val.writeLock().lock(); - - try { - ValueStatus status = val.status(); - - if (acquired(status)) - // Another thread is helping us, continue to the next entry. - continue; - - if (val.status() == ValueStatus.RETRY) - retryEntriesCnt.decrementAndGet(); - - assert retryEntriesCnt.get() >= 0; - - val.status(ValueStatus.PENDING); - - batch = Collections.singletonMap(e.getKey(), val); - } - finally { - val.writeLock().unlock(); - } - - if (!batch.isEmpty()) { - applyBatch(batch, false); - - cacheTotalOverflowCntr.incrementAndGet(); - - return; - } - } - } - finally { - cacheOverflowCntr.decrementAndGet(); - } - } - - /** - * Performs batch operation on underlying store. - * - * @param valMap Batch map. - * @param initSes {@code True} if need to initialize session. - */ - private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) { - assert valMap.size() <= batchSize; - - StoreOperation operation = null; - - // Construct a map for underlying store - Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size()); - - for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) { - if (operation == null) - operation = e.getValue().operation(); - - assert operation == e.getValue().operation(); - - assert e.getValue().status() == ValueStatus.PENDING; - - batch.put(e.getKey(), e.getValue().entry()); - } - - if (updateStore(operation, batch, initSes)) { - for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) { - StatefulValue<K, V> val = e.getValue(); - - val.writeLock().lock(); - - try { - val.status(ValueStatus.FLUSHED); - - StatefulValue<K, V> prev = writeCache.remove(e.getKey()); - - // Additional check to ensure consistency. - assert prev == val : "Map value for key " + e.getKey() + " was updated during flush"; - - val.signalFlushed(); - } - finally { - val.writeLock().unlock(); - } - } - } - else { - // Exception occurred, we must set RETRY status - for (StatefulValue<K, V> val : valMap.values()) { - val.writeLock().lock(); - - try { - val.status(ValueStatus.RETRY); - - retryEntriesCnt.incrementAndGet(); - - val.signalFlushed(); - } - finally { - val.writeLock().unlock(); - } - } - } - } - - /** - * Tries to update store with the given values and returns {@code true} in case of success. - * - * <p/> If any exception in underlying store is occurred, this method checks the map size. - * If map size exceeds some critical value, then it returns {@code true} and this value will - * be lost. If map size does not exceed critical value, it will return false and value will - * be retained in write cache. - * - * @param operation Status indicating operation that should be performed. - * @param vals Key-Value map. - * @param initSes {@code True} if need to initialize session. - * @return {@code true} if value may be deleted from the write cache, - * {@code false} otherwise - */ - private boolean updateStore(StoreOperation operation, - Map<K, Entry<? extends K, ? extends V>> vals, - boolean initSes) { - - if (initSes && storeMgr != null) - storeMgr.initSession(null); - - try { - boolean threwEx = true; - - try { - switch (operation) { - case PUT: - store.writeAll(vals.values()); - - break; - - case RMV: - store.deleteAll(vals.keySet()); - - break; - - default: - assert false : "Unexpected operation: " + operation; - } - - threwEx = false; - - return true; - } - finally { - if (initSes && storeMgr != null) - storeMgr.endSession(null, threwEx); - } - } - catch (Exception e) { - LT.warn(log, e, "Unable to update underlying store: " + store); - - if (writeCache.sizex() > cacheCriticalSize || stopping.get()) { - for (Map.Entry<K, Entry<? extends K, ? extends V>> entry : vals.entrySet()) { - Object val = entry.getValue() != null ? entry.getValue().getValue() : null; - - log.warning("Failed to update store (value will be lost as current buffer size is greater " + - "than 'cacheCriticalSize' or node has been stopped before store was repaired) [key=" + - entry.getKey() + ", val=" + val + ", op=" + operation + "]"); - } - - return true; - } - - return false; - } - } - - /** - * Wakes up flushing threads if map size exceeded maximum value or in case of shutdown. - */ - private void wakeUp() { - flushLock.lock(); - - try { - canFlush.signalAll(); - } - finally { - flushLock.unlock(); - } - } - - /** - * Thread that performs time-based flushing of written values to the underlying storage. - */ - private class Flusher extends GridWorker { - /** {@inheritDoc */ - protected Flusher(String gridName, String name, IgniteLogger log) { - super(gridName, name, log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!stopping.get() || writeCache.sizex() > 0) { - awaitOperationsAvailable(); - - flushCache(writeCache.entrySet().iterator()); - } - } - - /** - * This method awaits until enough elements in map are available or given timeout is over. - * - * @throws InterruptedException If awaiting was interrupted. - */ - private void awaitOperationsAvailable() throws InterruptedException { - flushLock.lock(); - - try { - do { - if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 0) { - if (cacheFlushFreq > 0) - canFlush.await(cacheFlushFreq, TimeUnit.MILLISECONDS); - else - canFlush.await(); - } - } - while (writeCache.sizex() == 0 && !stopping.get()); - } - finally { - flushLock.unlock(); - } - } - - /** - * Removes values from the write cache and performs corresponding operation - * on the underlying store. - * - * @param it Iterator for write cache. - */ - private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) { - StoreOperation operation = null; - - Map<K, StatefulValue<K, V>> batch = null; - Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize); - - while (it.hasNext()) { - Map.Entry<K, StatefulValue<K, V>> e = it.next(); - - StatefulValue<K, V> val = e.getValue(); - - val.writeLock().lock(); - - try { - ValueStatus status = val.status(); - - if (acquired(status)) - // Another thread is helping us, continue to the next entry. - continue; - - if (status == ValueStatus.RETRY) - retryEntriesCnt.decrementAndGet(); - - assert retryEntriesCnt.get() >= 0; - - val.status(ValueStatus.PENDING); - - // We scan for the next operation and apply batch on operation change. Null means new batch. - if (operation == null) - operation = val.operation(); - - if (operation != val.operation()) { - // Operation is changed, so we need to perform a batch. - batch = pending; - pending = U.newLinkedHashMap(batchSize); - - operation = val.operation(); - - pending.put(e.getKey(), val); - } - else - pending.put(e.getKey(), val); - - if (pending.size() == batchSize) { - batch = pending; - pending = U.newLinkedHashMap(batchSize); - - operation = null; - } - } - finally { - val.writeLock().unlock(); - } - - if (batch != null && !batch.isEmpty()) { - applyBatch(batch, true); - batch = null; - } - } - - // Process the remainder. - if (!pending.isEmpty()) - applyBatch(pending, true); - } - } - - /** - * For test purposes only. - * - * @return Write cache for the underlying store operations. - */ - Map<K, StatefulValue<K, V>> writeCache() { - return writeCache; - } - - /** - * Enumeration that represents possible operations on the underlying store. - */ - private enum StoreOperation { - /** Put key-value pair to the underlying store. */ - PUT, - - /** Remove key from the underlying store. */ - RMV - } - - /** - * Enumeration that represents possible states of value in the map. - */ - private enum ValueStatus { - /** Value is scheduled for write or delete from the underlying cache but has not been captured by flusher. */ - NEW, - - /** Value is captured by flusher and store operation is performed at the moment. */ - PENDING, - - /** Store operation has failed and it will be re-tried at the next flush. */ - RETRY, - - /** Store operation succeeded and this value will be removed by flusher. */ - FLUSHED, - } - - /** - * Checks if given status indicates pending or complete flush operation. - * - * @param status Status to check. - * @return {@code true} if status indicates any pending or complete store update operation. - */ - private boolean acquired(ValueStatus status) { - return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED; - } - - /** - * A state-value-operation trio. - * - * @param <V> Value type. - */ - private static class StatefulValue<K, V> extends ReentrantReadWriteLock { - /** */ - private static final long serialVersionUID = 0L; - - /** Value. */ - @GridToStringInclude - private Entry<? extends K, ? extends V> val; - - /** Store operation. */ - private StoreOperation storeOperation; - - /** Value status. */ - private ValueStatus valStatus; - - /** Condition to wait for flush event */ - private Condition flushCond = writeLock().newCondition(); - - /** - * Creates a state-value pair with {@link ValueStatus#NEW} status. - * - * @param val Value. - * @param storeOperation Store operation. - */ - private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeOperation) { - assert storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV; - - this.val = val; - this.storeOperation = storeOperation; - valStatus = ValueStatus.NEW; - } - - /** - * @return Stored value. - */ - private Entry<? extends K, ? extends V> entry() { - return val; - } - - /** - * @return Store operation. - */ - private StoreOperation operation() { - return storeOperation; - } - - /** - * @return Value status - */ - private ValueStatus status() { - return valStatus; - } - - /** - * Updates value status. - * - * @param valStatus Value status. - */ - private void status(ValueStatus valStatus) { - this.valStatus = valStatus; - } - - /** - * Updates both value and value status. - * - * @param val Value. - * @param storeOperation Store operation. - * @param valStatus Value status. - */ - private void update(@Nullable Entry<? extends K, ? extends V> val, - StoreOperation storeOperation, - ValueStatus valStatus) { - this.val = val; - this.storeOperation = storeOperation; - this.valStatus = valStatus; - } - - /** - * Awaits a signal on flush condition - * - * @throws IgniteInterruptedCheckedException If thread was interrupted. - */ - private void waitForFlush() throws IgniteInterruptedCheckedException { - U.await(flushCond); - } - - /** - * Signals flush condition. - */ - @SuppressWarnings({"SignalWithoutCorrespondingAwait"}) - private void signalFlushed() { - flushCond.signalAll(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof StatefulValue)) - return false; - - StatefulValue other = (StatefulValue)o; - - return F.eq(val, other.val) && F.eq(valStatus, other.valStatus); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = val != null ? val.hashCode() : 0; - - res = 31 * res + valStatus.hashCode(); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StatefulValue.class, this); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 6c27566..f2dd0c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -360,7 +360,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy plc) throws IgniteCheckedException { - if (ctx.store().isLocalStore()) { + if (ctx.store().isLocal()) { super.localLoad(keys, plc); return; @@ -377,7 +377,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); - ctx.store().loadAllFromStore(null, keys0, new CI2<KeyCacheObject, Object>() { + ctx.store().loadAll(null, keys0, new CI2<KeyCacheObject, Object>() { @Override public void apply(KeyCacheObject key, Object val) { loadEntry(key, val, ver0, null, topVer, replicate, plc0); } @@ -386,7 +386,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException { - if (ctx.store().isLocalStore()) { + if (ctx.store().isLocal()) { super.localLoadCache(p, args); return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 7195d1c..e6d5173 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -564,8 +564,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { log.debug("Entry has been cleared from swap storage: " + this); } - if (cctx.store().isLocalStore()) - cctx.store().removeFromStore(null, keyValue(false)); + if (cctx.store().isLocal()) + cctx.store().remove(null, keyValue(false)); rmv = true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 4e72bd1..c433698 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -494,7 +494,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> try { GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it = cctx.swap().iterator(id); - boolean isLocStore = cctx.store().isLocalStore(); + boolean isLocStore = cctx.store().isLocal(); if (it != null) { // We can safely remove these values because no entries will be created for evicted partition. @@ -508,7 +508,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> cctx.swap().remove(key); if (isLocStore) - cctx.store().removeFromStore(null, key.value(cctx.cacheObjectContext(), false)); + cctx.store().remove(null, key.value(cctx.cacheObjectContext(), false)); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 99fb724..7e93946 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -945,7 +945,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo } try { - cctx.store().loadAllFromStore( + cctx.store().loadAll( null, loadMap.keySet(), new CI2<KeyCacheObject, Object>() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index c376fa0..5e8a1f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1135,7 +1135,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (keys.size() > 1 && // Several keys ... writeThrough() && // and store is enabled ... - !ctx.store().isLocalStore() && // and this is not local store ... + !ctx.store().isLocal() && // and this is not local store ... !ctx.dr().receiveEnabled() // and no DR. ) { // This method can only be used when there are no replicated entries in the batch. @@ -1614,7 +1614,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (needReload != null) { final Map<KeyCacheObject, Integer> idxMap = needReload; - ctx.store().loadAllFromStore(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() { + ctx.store().loadAll(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() { @Override public void apply(KeyCacheObject k, Object v) { Integer idx = idxMap.get(k); @@ -1917,7 +1917,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { putMap; try { - ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { + ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { @Override public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) { return F.t(v, ver); } @@ -1940,7 +1940,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { rmvKeys; try { - ctx.store().removeAllFromStore(null, storeKeys); + ctx.store().removeAll(null, storeKeys); } catch (CacheStorePartialUpdateException e) { storeErr = e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index a9a6f23..98d9283 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1306,7 +1306,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { try { if (putMap != null) { try { - ctx.store().putAllToStore(null, F.viewReadOnly(putMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { + ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { @Override public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) { return F.t(v, ver); } @@ -1320,7 +1320,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } else { try { - ctx.store().removeAllFromStore(null, rmvKeys); + ctx.store().removeAll(null, rmvKeys); } catch (CacheStorePartialUpdateException e) { storeErr = e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java new file mode 100644 index 0000000..5fde622 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; + +/** + * Default store manager implementation. + */ +public class CacheOsStoreManager extends GridCacheStoreManagerAdapter { + /** Ignite context. */ + private final GridKernalContext ctx; + + /** Cache configuration. */ + private final CacheConfiguration cfg; + + /** + * Constructor. + * + * @param ctx Ignite context. + * @param cfg Cache configuration. + */ + public CacheOsStoreManager(GridKernalContext ctx, CacheConfiguration cfg) { + this.ctx = ctx; + this.cfg = cfg; + } + + /** {@inheritDoc} */ + @Override protected GridKernalContext igniteContext() { + return ctx; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration() { + return cfg; + } + + /** {@inheritDoc} */ + @Override protected boolean convertPortable() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java new file mode 100644 index 0000000..d9f50ac --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Cache store manager interface. + */ +public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> { + /** + * Initialize store manager. + * + * @param cfgStore Actual store. + * @param sesHolders Session holders. + * @throws org.apache.ignite.IgniteCheckedException If failed. + */ + public void initialize(@Nullable CacheStore<?, ?> cfgStore, Map<CacheStore, ThreadLocal> sesHolders) + throws IgniteCheckedException; + + /** + * @return {@code true} If store configured. + */ + public boolean configured(); + + /** + * @return Wrapped store. + */ + public CacheStore<Object, Object> store(); + + /** + * @return Unwrapped store provided in configuration. + */ + public CacheStore<?, ?> configuredStore(); + + /** + * @return {@code true} If local store is configured. + */ + public boolean isLocal(); + + /** + * @return {@code True} is write-through is enabled. + */ + public boolean isWriteThrough(); + + /** + * @return Whether DHT transaction can write to store from DHT. + */ + public boolean isWriteToStoreFromDht(); + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param key Cache key. + * @return Loaded value, possibly <tt>null</tt>. + * @throws IgniteCheckedException If data loading failed. + */ + @Nullable public Object load(@Nullable IgniteInternalTx tx, KeyCacheObject key) throws IgniteCheckedException; + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param keys Cache keys. + * @param vis Closure. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If data loading failed. + */ + public boolean loadAll(@Nullable IgniteInternalTx tx, Collection<? extends KeyCacheObject> keys, + IgniteBiInClosure<KeyCacheObject, Object> vis) throws IgniteCheckedException; + + /** + * @param tx Cache transaction. + * @param keys Cache keys. + * @param vis Closure to apply for loaded elements. + * @throws IgniteCheckedException If data loading failed. + */ + public void localStoreLoadAll(@Nullable IgniteInternalTx tx, Collection<? extends KeyCacheObject> keys, + final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis) throws IgniteCheckedException; + + /** + * Loads data from persistent store. + * + * @param vis Closer to cache loaded elements. + * @param args User arguments. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If data loading failed. + */ + public boolean loadCache(final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> vis, Object[] args) + throws IgniteCheckedException; + + /** + * Puts key-value pair into storage. + * + * @param tx Cache transaction. + * @param key Key. + * @param val Value. + * @param ver Version. + * @return {@code true} If there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + public boolean put(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver) + throws IgniteCheckedException; + + /** + * Puts key-value pair into storage. + * + * @param tx Cache transaction. + * @param map Map. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + public boolean putAll(@Nullable IgniteInternalTx tx, Map<Object, IgniteBiTuple<Object, GridCacheVersion>> map) + throws IgniteCheckedException; + + /** + * @param tx Cache transaction. + * @param key Key. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + public boolean remove(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException; + + /** + * @param tx Cache transaction. + * @param keys Key. + * @return {@code True} if there is a persistent storage. + * @throws IgniteCheckedException If storage failed. + */ + public boolean removeAll(@Nullable IgniteInternalTx tx, Collection<Object> keys) + throws IgniteCheckedException; + + /** + * @param tx Transaction. + * @param commit Commit. + * @throws IgniteCheckedException If failed. + */ + public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException; + + /** + * End session initiated by write-behind store. + */ + public void writeBehindSessionInit(); + + /** + * End session initiated by write-behind store. + * + * @param threwEx If exception was thrown. + * @throws IgniteCheckedException If failed. + */ + public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException; + + /** + * @throws IgniteCheckedException If failed. + */ + public void forceFlush() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java new file mode 100644 index 0000000..f9a966c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -0,0 +1,1111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.integration.*; +import java.util.*; + +/** + * Store manager. + */ +@SuppressWarnings({"AssignmentToCatchBlockParameter", "unchecked"}) +public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapter implements CacheStoreManager { + /** */ + private static final UUID SES_ATTR = UUID.randomUUID(); + + /** */ + protected CacheStore<Object, Object> store; + + /** */ + protected CacheStore<?, ?> cfgStore; + + /** */ + private CacheStoreBalancingWrapper<Object, Object> singleThreadGate; + + /** */ + private ThreadLocal<SessionData> sesHolder; + + /** */ + private boolean locStore; + + /** */ + private boolean writeThrough; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException { + GridKernalContext ctx = igniteContext(); + CacheConfiguration cfg = cacheConfiguration(); + + writeThrough = cfg.isWriteThrough(); + + this.cfgStore = cfgStore; + + store = cacheStoreWrapper(ctx, cfgStore, cfg); + + singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store); + + ThreadLocal<SessionData> sesHolder0 = null; + + if (cfgStore != null) { + sesHolder0 = ((Map<CacheStore, ThreadLocal>)sesHolders).get(cfgStore); + + if (sesHolder0 == null) { + ThreadLocalSession locSes = new ThreadLocalSession(); + + if (ctx.resource().injectStoreSession(cfgStore, locSes)) { + sesHolder0 = locSes.sesHolder; + + sesHolders.put(cfgStore, sesHolder0); + } + } + } + + sesHolder = sesHolder0; + + locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class); + } + + /** {@inheritDoc} */ + @Override public boolean isWriteThrough() { + return writeThrough; + } + + /** + * Creates a wrapped cache store if write-behind cache is configured. + * + * @param ctx Kernal context. + * @param cfgStore Store provided in configuration. + * @param cfg Cache configuration. + * @return Instance if {@link GridCacheWriteBehindStore} if write-behind store is configured, + * or user-defined cache store. + */ + @SuppressWarnings({"unchecked"}) + private CacheStore cacheStoreWrapper(GridKernalContext ctx, + @Nullable CacheStore cfgStore, + CacheConfiguration cfg) { + if (cfgStore == null || !cfg.isWriteBehindEnabled()) + return cfgStore; + + GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(this, + ctx.gridName(), + cfg.getName(), + ctx.log(GridCacheWriteBehindStore.class), + cfgStore); + + store.setFlushSize(cfg.getWriteBehindFlushSize()); + store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount()); + store.setFlushFrequency(cfg.getWriteBehindFlushFrequency()); + store.setBatchSize(cfg.getWriteBehindBatchSize()); + + return store; + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + if (store instanceof LifecycleAware) { + try { + // Avoid second start() call on store in case when near cache is enabled. + if (cctx.config().isWriteBehindEnabled()) { + if (!cctx.isNear()) + ((LifecycleAware)store).start(); + } + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to start cache store: " + e, e); + } + } + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + if (store instanceof LifecycleAware) { + try { + // Avoid second start() call on store in case when near cache is enabled. + if (cctx.config().isWriteBehindEnabled()) { + if (!cctx.isNear()) + ((LifecycleAware)store).stop(); + } + } + catch (Exception e) { + U.error(log(), "Failed to stop cache store.", e); + } + } + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + return locStore; + } + + /** {@inheritDoc} */ + @Override public boolean configured() { + return store != null; + } + + /** {@inheritDoc} */ + @Override public CacheStore<?, ?> configuredStore() { + return cfgStore; + } + + /** {@inheritDoc} */ + @Override @Nullable public Object load(@Nullable IgniteInternalTx tx, KeyCacheObject key) + throws IgniteCheckedException { + return loadFromStore(tx, key, true); + } + + /** + * Loads data from persistent store. + * + * @param tx Cache transaction. + * @param key Cache key. + * @param convert Convert flag. + * @return Loaded value, possibly <tt>null</tt>. + * @throws IgniteCheckedException If data loading failed. + */ + @Nullable private Object loadFromStore(@Nullable IgniteInternalTx tx, + KeyCacheObject key, + boolean convert) + throws IgniteCheckedException { + if (store != null) { + if (key.internal()) + // Never load internal keys from store as they are never persisted. + return null; + + Object storeKey = key.value(cctx.cacheObjectContext(), false); + + if (convertPortable()) + storeKey = cctx.unwrapPortableIfNeeded(storeKey, false); + + if (log.isDebugEnabled()) + log.debug("Loading value from store for key: " + storeKey); + + sessionInit0(tx); + + boolean thewEx = true; + + Object val = null; + + try { + val = singleThreadGate.load(storeKey); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Loaded value from store [key=" + key + ", val=" + val + ']'); + + if (convert) { + val = convert(val); + + return val; + } + else + return val; + } + + return null; + } + + /** + * @param val Internal value. + * @return User value. + */ + private Object convert(Object val) { + if (val == null) + return null; + + return locStore ? ((IgniteBiTuple<Object, GridCacheVersion>)val).get1() : val; + } + + /** {@inheritDoc} */ + @Override public boolean isWriteToStoreFromDht() { + return cctx.config().isWriteBehindEnabled() || locStore; + } + + /** {@inheritDoc} */ + @Override public void localStoreLoadAll(@Nullable IgniteInternalTx tx, Collection keys, GridInClosure3 vis) + throws IgniteCheckedException { + assert store != null; + assert locStore; + + loadAllFromStore(tx, keys, null, vis); + } + + /** {@inheritDoc} */ + @Override public boolean loadAll(@Nullable IgniteInternalTx tx, Collection keys, IgniteBiInClosure vis) + throws IgniteCheckedException { + if (store != null) { + loadAllFromStore(tx, keys, vis, null); + + return true; + } + else { + for (Object key : keys) + vis.apply(key, null); + } + + return false; + } + + /** + * @param tx Cache transaction. + * @param keys Keys to load. + * @param vis Key/value closure (only one of vis or verVis can be specified). + * @param verVis Key/value/version closure (only one of vis or verVis can be specified). + * @throws IgniteCheckedException If failed. + */ + private void loadAllFromStore(@Nullable IgniteInternalTx tx, + Collection<? extends KeyCacheObject> keys, + @Nullable final IgniteBiInClosure<KeyCacheObject, Object> vis, + @Nullable final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> verVis) + throws IgniteCheckedException { + assert vis != null ^ verVis != null; + assert verVis == null || locStore; + + final boolean convert = verVis == null; + + if (!keys.isEmpty()) { + if (keys.size() == 1) { + KeyCacheObject key = F.first(keys); + + if (convert) + vis.apply(key, load(tx, key)); + else { + IgniteBiTuple<Object, GridCacheVersion> t = + (IgniteBiTuple<Object, GridCacheVersion>)loadFromStore(tx, key, false); + + if (t != null) + verVis.apply(key, t.get1(), t.get2()); + } + + return; + } + + Collection<Object> keys0; + + if (convertPortable()) { + keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { + @Override public Object apply(KeyCacheObject key) { + return cctx.unwrapPortableIfNeeded(key.value(cctx.cacheObjectContext(), false), false); + } + }); + } + else { + keys0 = F.viewReadOnly(keys, new C1<KeyCacheObject, Object>() { + @Override public Object apply(KeyCacheObject key) { + return key.value(cctx.cacheObjectContext(), false); + } + }); + } + + if (log.isDebugEnabled()) + log.debug("Loading values from store for keys: " + keys0); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { + @SuppressWarnings("ConstantConditions") + @Override public void apply(Object k, Object val) { + if (convert) { + Object v = convert(val); + + vis.apply(cctx.toCacheKeyObject(k), v); + } + else { + IgniteBiTuple<Object, GridCacheVersion> v = (IgniteBiTuple<Object, GridCacheVersion>)val; + + if (v != null) + verVis.apply(cctx.toCacheKeyObject(k), v.get1(), v.get2()); + } + } + }; + + if (keys.size() > singleThreadGate.loadAllThreshold()) { + Map<Object, Object> map = store.loadAll(keys0); + + if (map != null) { + for (Map.Entry<Object, Object> e : map.entrySet()) + c.apply(cctx.toCacheKeyObject(e.getKey()), e.getValue()); + } + } + else + singleThreadGate.loadAll(keys0, c); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Loaded values from store for keys: " + keys0); + } + } + + /** {@inheritDoc} */ + @Override public boolean loadCache(final GridInClosure3 vis, Object[] args) throws IgniteCheckedException { + if (store != null) { + if (log.isDebugEnabled()) + log.debug("Loading all values from store."); + + sessionInit0(null); + + boolean thewEx = true; + + try { + store.loadCache(new IgniteBiInClosure<Object, Object>() { + @Override public void apply(Object k, Object o) { + Object v; + GridCacheVersion ver = null; + + if (locStore) { + IgniteBiTuple<Object, GridCacheVersion> t = (IgniteBiTuple<Object, GridCacheVersion>)o; + + v = t.get1(); + ver = t.get2(); + } + else + v = o; + + KeyCacheObject cacheKey = cctx.toCacheKeyObject(k); + + vis.apply(cacheKey, v, ver); + } + }, args); + + thewEx = false; + } + catch (CacheLoaderException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheLoaderException(e)); + } + finally { + sessionEnd0(null, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Loaded all values from store."); + + return true; + } + + LT.warn(log, null, "Calling Cache.loadCache() method will have no effect, " + + "CacheConfiguration.getStore() is not defined for cache: " + cctx.namexx()); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean put(@Nullable IgniteInternalTx tx, Object key, Object val, GridCacheVersion ver) + throws IgniteCheckedException { + if (store != null) { + // Never persist internal keys. + if (key instanceof GridCacheInternal) + return true; + + if (convertPortable()) { + key = cctx.unwrapPortableIfNeeded(key, false); + val = cctx.unwrapPortableIfNeeded(val, false); + } + + if (log.isDebugEnabled()) + log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']'); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheWriterException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheWriterException(e)); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Stored value in cache store [key=" + key + ", val=" + val + ']'); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean putAll(@Nullable IgniteInternalTx tx, Map map) throws IgniteCheckedException { + if (F.isEmpty(map)) + return true; + + if (map.size() == 1) { + Map.Entry<Object, IgniteBiTuple<Object, GridCacheVersion>> e = + ((Map<Object, IgniteBiTuple<Object, GridCacheVersion>>)map).entrySet().iterator().next(); + + return put(tx, e.getKey(), e.getValue().get1(), e.getValue().get2()); + } + else { + if (store != null) { + EntriesView entries = new EntriesView(map); + + if (log.isDebugEnabled()) + log.debug("Storing values in cache store [entries=" + entries + ']'); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + store.writeAll(entries); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (Exception e) { + if (!(e instanceof CacheWriterException)) + e = new CacheWriterException(e); + + if (!entries.isEmpty()) { + List<Object> keys = new ArrayList<>(entries.size()); + + for (Cache.Entry<?, ?> entry : entries) + keys.add(entry.getKey()); + + throw new CacheStorePartialUpdateException(keys, e); + } + + throw new IgniteCheckedException(e); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Stored value in cache store [entries=" + entries + ']'); + + return true; + } + + return false; + } + } + + /** {@inheritDoc} */ + @Override public boolean remove(@Nullable IgniteInternalTx tx, Object key) throws IgniteCheckedException { + if (store != null) { + // Never remove internal key from store as it is never persisted. + if (key instanceof GridCacheInternal) + return false; + + if (convertPortable()) + key = cctx.unwrapPortableIfNeeded(key, false); + + if (log.isDebugEnabled()) + log.debug("Removing value from cache store [key=" + key + ']'); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + store.delete(key); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (CacheWriterException e) { + throw new IgniteCheckedException(e); + } + catch (Exception e) { + throw new IgniteCheckedException(new CacheWriterException(e)); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Removed value from cache store [key=" + key + ']'); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) + throws IgniteCheckedException { + if (F.isEmpty(keys)) + return true; + + if (keys.size() == 1) { + Object key = keys.iterator().next(); + + return remove(tx, key); + } + + if (store != null) { + Collection<Object> keys0 = convertPortable() ? cctx.unwrapPortablesIfNeeded(keys, false) : keys; + + if (log.isDebugEnabled()) + log.debug("Removing values from cache store [keys=" + keys0 + ']'); + + sessionInit0(tx); + + boolean thewEx = true; + + try { + store.deleteAll(keys0); + + thewEx = false; + } + catch (ClassCastException e) { + handleClassCastException(e); + } + catch (Exception e) { + if (!(e instanceof CacheWriterException)) + e = new CacheWriterException(e); + + if (!keys0.isEmpty()) + throw new CacheStorePartialUpdateException(keys0, e); + + throw new IgniteCheckedException(e); + } + finally { + sessionEnd0(tx, thewEx); + } + + if (log.isDebugEnabled()) + log.debug("Removed values from cache store [keys=" + keys0 + ']'); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public CacheStore<Object, Object> store() { + return store; + } + + /** {@inheritDoc} */ + @Override public void forceFlush() throws IgniteCheckedException { + if (store instanceof GridCacheWriteBehindStore) + ((GridCacheWriteBehindStore)store).forceFlush(); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException { + assert store != null; + + sessionInit0(tx); + + try { + store.sessionEnd(commit); + } + finally { + if (sesHolder != null) { + sesHolder.set(null); + + tx.removeMeta(SES_ATTR); + } + } + } + + /** + * @param e Class cast exception. + * @throws IgniteCheckedException Thrown exception. + */ + private void handleClassCastException(ClassCastException e) throws IgniteCheckedException { + assert e != null; + + if (e.getMessage() != null) { + throw new IgniteCheckedException("Cache store must work with portable objects if portables are " + + "enabled for cache [cacheName=" + cctx.namex() + ']', e); + } + else + throw e; + } + + /** {@inheritDoc} */ + @Override public void writeBehindSessionInit() { + sessionInit0(null); + } + + /** {@inheritDoc} */ + @Override public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException { + sessionEnd0(null, threwEx); + } + + /** + * @param tx Current transaction. + */ + private void sessionInit0(@Nullable IgniteInternalTx tx) { + if (sesHolder == null) + return; + + assert sesHolder.get() == null; + + SessionData ses; + + if (tx != null) { + ses = tx.meta(SES_ATTR); + + if (ses == null) { + ses = new SessionData(tx, cctx.name()); + + tx.addMeta(SES_ATTR, ses); + } + else + // Session cache name may change in cross-cache transaction. + ses.cacheName(cctx.name()); + } + else + ses = new SessionData(null, cctx.name()); + + sesHolder.set(ses); + } + + /** + * Clears session holder. + */ + private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException { + try { + if (tx == null) + store.sessionEnd(threwEx); + } + catch (Exception e) { + if (!threwEx) + throw U.cast(e); + } + finally { + if (sesHolder != null) + sesHolder.set(null); + } + } + + /** + * @return Ignite context. + */ + protected abstract GridKernalContext igniteContext(); + + /** + * @return Cache configuration. + */ + protected abstract CacheConfiguration cacheConfiguration(); + + /** + * @return Convert-portable flag. + */ + protected abstract boolean convertPortable(); + + /** + * + */ + private static class SessionData { + /** */ + @GridToStringExclude + private final IgniteInternalTx tx; + + /** */ + private String cacheName; + + /** */ + @GridToStringInclude + private Map<Object, Object> props; + + /** + * @param tx Current transaction. + * @param cacheName Cache name. + */ + private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) { + this.tx = tx; + this.cacheName = cacheName; + } + + /** + * @return Transaction. + */ + @Nullable private Transaction transaction() { + return tx != null ? tx.proxy() : null; + } + + /** + * @return Properties. + */ + private Map<Object, Object> properties() { + if (props == null) + props = new GridLeanMap<>(); + + return props; + } + + /** + * @return Cache name. + */ + private String cacheName() { + return cacheName; + } + + /** + * @param cacheName Cache name. + */ + private void cacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SessionData.class, this, "tx", CU.txString(tx)); + } + } + + /** + * + */ + private static class ThreadLocalSession implements CacheStoreSession { + /** */ + private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); + + /** {@inheritDoc} */ + @Nullable @Override public Transaction transaction() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? ses0.transaction() : null; + } + + /** {@inheritDoc} */ + @Override public boolean isWithinTransaction() { + return transaction() != null; + } + + /** {@inheritDoc} */ + @Override public <K1, V1> Map<K1, V1> properties() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? (Map<K1, V1>)ses0.properties() : null; + } + + /** {@inheritDoc} */ + @Nullable @Override public String cacheName() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? ses0.cacheName() : null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ThreadLocalSession.class, this); + } + } + + /** + * + */ + private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> { + /** */ + private final Map<?, IgniteBiTuple<?, GridCacheVersion>> map; + + /** */ + private Set<Object> rmvd; + + /** */ + private boolean cleared; + + /** + * @param map Map. + */ + private EntriesView(Map<?, IgniteBiTuple<?, GridCacheVersion>> map) { + assert map != null; + + this.map = map; + } + + /** {@inheritDoc} */ + @Override public int size() { + return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 0)); + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return cleared || !iterator().hasNext(); + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + if (cleared || !(o instanceof Cache.Entry)) + return false; + + Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; + + return map.containsKey(e.getKey()); + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator<Cache.Entry<?, ?>> iterator() { + if (cleared) + return F.emptyIterator(); + + final Iterator<Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>>> it0 = (Iterator)map.entrySet().iterator(); + + return new Iterator<Cache.Entry<?, ?>>() { + /** */ + private Cache.Entry<?, ?> cur; + + /** */ + private Cache.Entry<?, ?> next; + + /** + * + */ + { + checkNext(); + } + + /** + * + */ + private void checkNext() { + while (it0.hasNext()) { + Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>> e = it0.next(); + + Object k = e.getKey(); + + if (rmvd != null && rmvd.contains(k)) + continue; + + Object v = locStore ? e.getValue() : e.getValue().get1(); + + if (convertPortable()) { + k = cctx.unwrapPortableIfNeeded(k, false); + v = cctx.unwrapPortableIfNeeded(v, false); + } + + next = new CacheEntryImpl<>(k, v); + + break; + } + } + + @Override public boolean hasNext() { + return next != null; + } + + @Override public Cache.Entry<?, ?> next() { + if (next == null) + throw new NoSuchElementException(); + + cur = next; + + next = null; + + checkNext(); + + return cur; + } + + @Override public void remove() { + if (cur == null) + throw new IllegalStateException(); + + addRemoved(cur); + + cur = null; + } + }; + } + + /** {@inheritDoc} */ + @Override public boolean add(Cache.Entry<?, ?> entry) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean addAll(Collection<? extends Cache.Entry<?, ?>> col) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + if (cleared || !(o instanceof Cache.Entry)) + return false; + + Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o; + + if (rmvd != null && rmvd.contains(e.getKey())) + return false; + + if (mapContains(e)) { + addRemoved(e); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean containsAll(Collection<?> col) { + if (cleared) + return false; + + for (Object o : col) { + if (contains(o)) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean removeAll(Collection<?> col) { + if (cleared) + return false; + + boolean modified = false; + + for (Object o : col) { + if (remove(o)) + modified = true; + } + + return modified; + } + + /** {@inheritDoc} */ + @Override public boolean retainAll(Collection<?> col) { + if (cleared) + return false; + + boolean modified = false; + + for (Cache.Entry<?, ?> e : this) { + if (!col.contains(e)) { + addRemoved(e); + + modified = true; + } + } + + return modified; + } + + /** {@inheritDoc} */ + @Override public void clear() { + cleared = true; + } + + /** + * @param e Entry. + */ + private void addRemoved(Cache.Entry<?, ?> e) { + if (rmvd == null) + rmvd = new HashSet<>(); + + rmvd.add(e.getKey()); + } + + /** + * @param e Entry. + * @return {@code True} if original map contains entry. + */ + private boolean mapContains(Cache.Entry<?, ?> e) { + return map.containsKey(e.getKey()); + } + + /** {@inheritDoc} */ + public String toString() { + Iterator<Cache.Entry<?, ?>> it = iterator(); + + if (!it.hasNext()) + return "[]"; + + SB sb = new SB("["); + + while (true) { + Cache.Entry<?, ?> e = it.next(); + + sb.a(e.toString()); + + if (!it.hasNext()) + return sb.a(']').toString(); + + sb.a(", "); + } + } + } +}