http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 0000000,ad6a1a2..ff06f9c7
mode 000000,100644..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@@ -1,0 -1,2037 +1,2040 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cache.eviction.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.thread.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ import sun.misc.*;
+
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ import java.util.concurrent.locks.*;
+ import java.util.concurrent.locks.Lock;
+
+ import static java.util.concurrent.TimeUnit.*;
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.cache.CacheMemoryMode.*;
+ import static org.apache.ignite.cache.CacheMode.*;
+ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
+ import static
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+ import static org.jdk8.backport.ConcurrentLinkedDeque8.*;
+
+ /**
+ * Cache eviction manager.
+ */
+ public class GridCacheEvictionManager<K, V> extends
GridCacheManagerAdapter<K, V> {
+ /** Unsafe instance. */
+ private static final Unsafe unsafe = GridUnsafe.unsafe();
+
+ /** Eviction policy. */
+ private CacheEvictionPolicy<K, V> plc;
+
+ /** Eviction filter. */
+ private CacheEvictionFilter<K, V> filter;
+
+ /** Eviction buffer. */
+ private final ConcurrentLinkedDeque8<EvictionInfo> bufEvictQ = new
ConcurrentLinkedDeque8<>();
+
+ /** Attribute name used to queue node in entry metadata. */
+ private final String meta = UUID.randomUUID().toString();
+
+ /** Active eviction futures. */
+ private final Map<Long, EvictionFuture> futs = new ConcurrentHashMap8<>();
+
+ /** Futures count modification lock. */
+ private final Lock futsCntLock = new ReentrantLock();
+
+ /** Futures count condition. */
+ private final Condition futsCntCond = futsCntLock.newCondition();
+
+ /** Active futures count. */
+ private volatile int activeFutsCnt;
+
+ /** Max active futures count. */
+ private int maxActiveFuts;
+
+ /** Generator of future IDs. */
+ private final AtomicLong idGen = new AtomicLong();
+
+ /** Evict backup synchronized flag. */
+ private boolean evictSync;
+
+ /** Evict near synchronized flag. */
+ private boolean nearSync;
+
+ /** Flag to hold {@code evictSync || nearSync} result. */
+ private boolean evictSyncAgr;
+
+ /** Policy enabled. */
+ private boolean plcEnabled;
+
+ /** */
+ private CacheMemoryMode memoryMode;
+
+ /** Backup entries worker. */
+ private BackupWorker backupWorker;
+
+ /** Backup entries worker thread. */
+ private IgniteThread backupWorkerThread;
+
+ /** Busy lock. */
+ private final GridBusyLock busyLock = new GridBusyLock();
+
+ /** Stopping flag. */
+ private volatile boolean stopping;
+
+ /** Current future. */
+ private final AtomicReference<EvictionFuture> curEvictFut = new
AtomicReference<>();
+
+ /** First eviction flag. */
+ private volatile boolean firstEvictWarn;
+
+ /** {@inheritDoc} */
+ @Override public void start0() throws IgniteCheckedException {
+ CacheConfiguration cfg = cctx.config();
+
+ plc = cctx.isNear() ? cfg.<K, V>getNearEvictionPolicy() : cfg.<K,
V>getEvictionPolicy();
+
+ memoryMode = cctx.config().getMemoryMode();
+
+ plcEnabled = plc != null && memoryMode != OFFHEAP_TIERED;
+
+ filter = cfg.getEvictionFilter();
+
+ if (cfg.getEvictMaxOverflowRatio() < 0)
+ throw new IgniteCheckedException("Configuration parameter
'maxEvictOverflowRatio' cannot be negative.");
+
+ if (cfg.getEvictSynchronizedKeyBufferSize() < 0)
+ throw new IgniteCheckedException("Configuration parameter
'evictSynchronizedKeyBufferSize' cannot be negative.");
+
+ if (!cctx.isLocal()) {
+ evictSync = cfg.isEvictSynchronized() && !cctx.isNear() &&
!cctx.isSwapOrOffheapEnabled();
+
+ nearSync = cfg.isEvictNearSynchronized() && isNearEnabled(cctx)
&& !cctx.isNear();
+ }
+ else {
+ if (cfg.isEvictSynchronized())
+ U.warn(log, "Ignored 'evictSynchronized' configuration
property for LOCAL cache: " + cctx.namexx());
+
+ if (cfg.isEvictNearSynchronized())
+ U.warn(log, "Ignored 'evictNearSynchronized' configuration
property for LOCAL cache: " + cctx.namexx());
+ }
+
+ if (cctx.isDht() && !nearSync && evictSync && isNearEnabled(cctx))
+ throw new IgniteCheckedException("Illegal configuration (may lead
to data inconsistency) " +
+ "[evictSync=true, evictNearSync=false]");
+
+ reportConfigurationProblems();
+
+ evictSyncAgr = evictSync || nearSync;
+
+ if (evictSync && !cctx.isNear() && plcEnabled) {
+ backupWorker = new BackupWorker();
+
+ cctx.events().addListener(
+ new GridLocalEventListener() {
+ @Override public void onEvent(IgniteEvent evt) {
+ assert evt.type() == EVT_NODE_FAILED || evt.type() ==
EVT_NODE_LEFT ||
+ evt.type() == EVT_NODE_JOINED;
+
+ IgniteDiscoveryEvent discoEvt =
(IgniteDiscoveryEvent)evt;
+
+ // Notify backup worker on each topology change.
+ if (CU.affinityNode(cctx, discoEvt.eventNode()))
+ backupWorker.addEvent(discoEvt);
+ }
+ },
+ EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED);
+ }
+
+ if (evictSyncAgr) {
+ if (cfg.getEvictSynchronizedTimeout() <= 0)
+ throw new IgniteCheckedException("Configuration parameter
'evictSynchronousTimeout' should be positive.");
+
+ if (cfg.getEvictSynchronizedConcurrencyLevel() <= 0)
+ throw new IgniteCheckedException("Configuration parameter
'evictSynchronousConcurrencyLevel' " +
+ "should be positive.");
+
+ maxActiveFuts = cfg.getEvictSynchronizedConcurrencyLevel();
+
+ cctx.io().addHandler(cctx.cacheId(),
GridCacheEvictionRequest.class, new CI2<UUID, GridCacheEvictionRequest<K, V>>()
{
+ @Override public void apply(UUID nodeId,
GridCacheEvictionRequest<K, V> msg) {
+ processEvictionRequest(nodeId, msg);
+ }
+ });
+
+ cctx.io().addHandler(cctx.cacheId(),
GridCacheEvictionResponse.class, new CI2<UUID, GridCacheEvictionResponse<K,
V>>() {
+ @Override public void apply(UUID nodeId,
GridCacheEvictionResponse<K, V> msg) {
+ processEvictionResponse(nodeId, msg);
+ }
+ });
+
+ cctx.events().addListener(
+ new GridLocalEventListener() {
+ @Override public void onEvent(IgniteEvent evt) {
+ assert evt.type() == EVT_NODE_FAILED || evt.type() ==
EVT_NODE_LEFT;
+
+ IgniteDiscoveryEvent discoEvt =
(IgniteDiscoveryEvent)evt;
+
+ for (EvictionFuture fut : futs.values())
+ fut.onNodeLeft(discoEvt.eventNode().id());
+ }
+ },
+ EVT_NODE_FAILED, EVT_NODE_LEFT);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Eviction manager started on node: " + cctx.nodeId());
+ }
+
+ /**
+ * Outputs warnings if potential configuration problems are detected.
+ */
+ private void reportConfigurationProblems() {
+ CacheMode mode = cctx.config().getCacheMode();
+
+ if (plcEnabled && !cctx.isNear() && mode == PARTITIONED) {
+ if (!evictSync) {
+ U.warn(log, "Evictions are not synchronized with other nodes
in topology " +
+ "which provides 2x-3x better performance but may cause
data inconsistency if cache store " +
+ "is not configured (consider changing 'evictSynchronized'
configuration property).",
+ "Evictions are not synchronized for cache: " +
cctx.namexx());
+ }
+
+ if (!nearSync && isNearEnabled(cctx)) {
+ U.warn(log, "Evictions on primary node are not synchronized
with near caches on other nodes " +
+ "which provides 2x-3x better performance but may cause
data inconsistency (consider changing " +
+ "'nearEvictSynchronized' configuration property).",
+ "Evictions are not synchronized with near caches on other
nodes for cache: " + cctx.namexx());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onKernalStart0() throws IgniteCheckedException {
+ super.onKernalStart0();
+
+ if (plcEnabled && evictSync && !cctx.isNear()) {
+ // Add dummy event to worker.
+ ClusterNode locNode = cctx.localNode();
+
+ IgniteDiscoveryEvent evt = new IgniteDiscoveryEvent(locNode,
"Dummy event.", EVT_NODE_JOINED, locNode);
+
+ evt.topologySnapshot(locNode.order(),
cctx.discovery().topology(locNode.order()));
+
+ backupWorker.addEvent(evt);
+
+ backupWorkerThread = new IgniteThread(backupWorker);
+ backupWorkerThread.start();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onKernalStop0(boolean cancel) {
+ super.onKernalStop0(cancel);
+
+ // Change stopping first.
+ stopping = true;
+
+ busyLock.block();
+
+ // Stop backup worker.
+ if (evictSync && !cctx.isNear() && backupWorker != null) {
+ backupWorker.cancel();
+
+ U.join(backupWorkerThread, log);
+ }
+
+ // Cancel all active futures.
+ for (EvictionFuture fut : futs.values())
+ fut.cancel();
+
+ if (log.isDebugEnabled())
+ log.debug("Eviction manager stopped on node: " + cctx.nodeId());
+ }
+
+ /**
+ * @return Current size of evict queue.
+ */
+ public int evictQueueSize() {
+ return bufEvictQ.sizex();
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param res Response.
+ */
+ private void processEvictionResponse(UUID nodeId,
GridCacheEvictionResponse<K, V> res) {
+ assert nodeId != null;
+ assert res != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Processing eviction response [node=" + nodeId + ",
localNode=" + cctx.nodeId() +
+ ", res=" + res + ']');
+
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ EvictionFuture fut = futs.get(res.futureId());
+
+ if (fut != null)
+ fut.onResponse(nodeId, res);
+ else if (log.isDebugEnabled())
+ log.debug("Eviction future for response is not found [res=" +
res + ", node=" + nodeId +
+ ", localNode=" + cctx.nodeId() + ']');
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param req Request.
+ */
+ private void processEvictionRequest(UUID nodeId,
GridCacheEvictionRequest<K, V> req) {
+ assert nodeId != null;
+ assert req != null;
+
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ if (req.classError() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Class got undeployed during eviction: " +
req.classError());
+
+ sendEvictionResponse(nodeId, new GridCacheEvictionResponse<K,
V>(cctx.cacheId(), req.futureId(), true));
+
+ return;
+ }
+
+ long topVer = lockTopology();
+
+ try {
+ if (topVer != req.topologyVersion()) {
+ if (log.isDebugEnabled())
+ log.debug("Topology version is different [locTopVer="
+ topVer +
+ ", rmtTopVer=" + req.topologyVersion() + ']');
+
+ sendEvictionResponse(nodeId,
+ new GridCacheEvictionResponse<K, V>(cctx.cacheId(),
req.futureId(), true));
+
+ return;
+ }
+
+ processEvictionRequest0(nodeId, req);
+ }
+ finally {
+ unlockTopology();
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param req Request.
+ */
+ private void processEvictionRequest0(UUID nodeId,
GridCacheEvictionRequest<K, V> req) {
+ if (log.isDebugEnabled())
+ log.debug("Processing eviction request [node=" + nodeId + ",
localNode=" + cctx.nodeId() +
+ ", reqSize=" + req.entries().size() + ']');
+
+ // Partition -> {{Key, Version}, ...}.
+ // Group DHT and replicated cache entries by their partitions.
+ Map<Integer, Collection<GridTuple3<K, GridCacheVersion, Boolean>>>
dhtEntries =
+ new HashMap<>();
+
+ Collection<GridTuple3<K, GridCacheVersion, Boolean>> nearEntries =
+ new LinkedList<>();
+
+ for (GridTuple3<K, GridCacheVersion, Boolean> t : req.entries()) {
+ Boolean near = t.get3();
+
+ if (!near) {
+ // Lock is required.
+ Collection<GridTuple3<K, GridCacheVersion, Boolean>> col =
+ F.addIfAbsent(dhtEntries,
cctx.affinity().partition(t.get1()),
+ new LinkedList<GridTuple3<K, GridCacheVersion,
Boolean>>());
+
+ assert col != null;
+
+ col.add(t);
+ }
+ else
+ nearEntries.add(t);
+ }
+
+ GridCacheEvictionResponse<K, V> res = new
GridCacheEvictionResponse<>(cctx.cacheId(), req.futureId());
+
+ GridCacheVersion obsoleteVer = cctx.versions().next();
+
+ // DHT and replicated cache entries.
+ for (Map.Entry<Integer, Collection<GridTuple3<K, GridCacheVersion,
Boolean>>> e : dhtEntries.entrySet()) {
+ int part = e.getKey();
+
+ boolean locked = lockPartition(part); // Will return false if
preloading is disabled.
+
+ try {
+ for (GridTuple3<K, GridCacheVersion, Boolean> t :
e.getValue()) {
+ K key = t.get1();
+ GridCacheVersion ver = t.get2();
+ Boolean near = t.get3();
+
+ assert !near;
+
+ boolean evicted = evictLocally(key, ver, near,
obsoleteVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Evicted key [key=" + key + ", ver=" + ver
+ ", near=" + near +
+ ", evicted=" + evicted +']');
+
+ if (locked && evicted)
+ // Preloading is in progress, we need to save
eviction info.
+ saveEvictionInfo(key, ver, part);
+
+ if (!evicted)
+ res.addRejected(key);
+ }
+ }
+ finally {
+ if (locked)
+ unlockPartition(part);
+ }
+ }
+
+ // Near entries.
+ for (GridTuple3<K, GridCacheVersion, Boolean> t : nearEntries) {
+ K key = t.get1();
+ GridCacheVersion ver = t.get2();
+ Boolean near = t.get3();
+
+ assert near;
+
+ boolean evicted = evictLocally(key, ver, near, obsoleteVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Evicted key [key=" + key + ", ver=" + ver + ",
near=" + near +
+ ", evicted=" + evicted +']');
+
+ if (!evicted)
+ res.addRejected(key);
+ }
+
+ sendEvictionResponse(nodeId, res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ private void sendEvictionResponse(UUID nodeId,
GridCacheEvictionResponse<K, V> res) {
+ try {
+ cctx.io().send(nodeId, res);
+
+ if (log.isDebugEnabled())
+ log.debug("Sent eviction response [node=" + nodeId + ",
localNode=" + cctx.nodeId() +
+ ", res" + res + ']');
+ }
+ catch (ClusterTopologyException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send eviction response since initiating
node left grid " +
+ "[node=" + nodeId + ", localNode=" + cctx.nodeId() + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send eviction response to node [node=" +
nodeId +
+ ", localNode=" + cctx.nodeId() + ", res" + res + ']', e);
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param ver Version.
+ * @param p Partition ID.
+ */
+ private void saveEvictionInfo(K key, GridCacheVersion ver, int p) {
+ assert cctx.preloadEnabled();
+
+ if (!cctx.isNear()) {
+ try {
+ GridDhtLocalPartition<K, V> part =
cctx.dht().topology().localPartition(p, -1, false);
+
+ assert part != null;
+
+ part.onEntryEvicted(key, ver);
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Partition does not belong to local node
[part=" + p +
+ ", nodeId" + cctx.localNode().id() + ']');
+ }
+ }
+ else
+ assert false : "Failed to save eviction info: " + cctx.namexx();
+ }
+
+ /**
+ * @param p Partition ID.
+ * @return {@code True} if partition has been actually locked,
+ * {@code false} if preloading is finished or disabled and no lock
is needed.
+ */
+ private boolean lockPartition(int p) {
+ if (!cctx.preloadEnabled())
+ return false;
+
+ if (!cctx.isNear()) {
+ try {
+ GridDhtLocalPartition<K, V> part =
cctx.dht().topology().localPartition(p, -1, false);
+
+ if (part != null && part.reserve()) {
+ part.lock();
+
+ if (part.state() != MOVING) {
+ part.unlock();
+
+ part.release();
+
+ return false;
+ }
+
+ return true;
+ }
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Partition does not belong to local node
[part=" + p +
+ ", nodeId" + cctx.localNode().id() + ']');
+ }
+ }
+
+ // No lock is needed.
+ return false;
+ }
+
+ /**
+ * @param p Partition ID.
+ */
+ private void unlockPartition(int p) {
+ if (!cctx.preloadEnabled())
+ return;
+
+ if (!cctx.isNear()) {
+ try {
+ GridDhtLocalPartition<K, V> part =
cctx.dht().topology().localPartition(p, -1, false);
+
+ if (part != null) {
+ part.unlock();
+
+ part.release();
+ }
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Partition does not belong to local node
[part=" + p +
+ ", nodeId" + cctx.localNode().id() + ']');
+ }
+ }
+ }
+
+ /**
+ * Locks topology (for DHT cache only) and returns its version.
+ *
+ * @return Topology version after lock.
+ */
+ private long lockTopology() {
+ if (!cctx.isNear()) {
+ cctx.dht().topology().readLock();
+
+ return cctx.dht().topology().topologyVersion();
+ }
+
+ return 0;
+ }
+
+ /**
+ * Unlocks topology.
+ */
+ private void unlockTopology() {
+ if (!cctx.isNear())
+ cctx.dht().topology().readUnlock();
+ }
+
+ /**
+ * @param key Key to evict.
+ * @param ver Entry version on initial node.
+ * @param near {@code true} if entry should be evicted from near cache.
+ * @param obsoleteVer Obsolete version.
+ * @return {@code true} if evicted successfully, {@code false} if could
not be evicted.
+ */
+ private boolean evictLocally(K key, final GridCacheVersion ver, boolean
near, GridCacheVersion obsoleteVer) {
+ assert key != null;
+ assert ver != null;
+ assert obsoleteVer != null;
+ assert evictSyncAgr;
+ assert !cctx.isNear() || cctx.isReplicated();
+
+ if (log.isDebugEnabled())
+ log.debug("Evicting key locally [key=" + key + ", ver=" + ver +
", obsoleteVer=" + obsoleteVer +
+ ", localNode=" + cctx.localNode() + ']');
+
+ GridCacheAdapter<K, V> cache = near ? cctx.dht().near() :
cctx.cache();
+
+ GridCacheEntryEx<K, V> entry = cache.peekEx(key);
+
+ if (entry == null)
+ return true;
+
+ try {
+ // If entry should be evicted from near cache it can be done
safely
+ // without any consistency risks. We don't use filter in this
case.
+ // If entry should be evicted from DHT cache, we do not compare
versions
+ // as well because versions may change outside the transaction.
+ return evict0(cache, entry, obsoleteVer, null, false);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to evict entry on remote node [key=" + key +
", localNode=" + cctx.nodeId() + ']', e);
+
+ return false;
+ }
+ }
+
+ /**
+ * @param cache Cache from which to evict entry.
+ * @param entry Entry to evict.
+ * @param obsoleteVer Obsolete version.
+ * @param filter Filter.
+ * @param explicit If eviction is initiated by user.
+ * @return {@code true} if entry has been evicted.
+ * @throws IgniteCheckedException If failed to evict entry.
+ */
+ private boolean evict0(GridCacheAdapter<K, V> cache, GridCacheEntryEx<K,
V> entry, GridCacheVersion obsoleteVer,
+ @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, boolean
explicit) throws IgniteCheckedException {
+ assert cache != null;
+ assert entry != null;
+ assert obsoleteVer != null;
+
+ boolean recordable =
cctx.events().isRecordable(EVT_CACHE_ENTRY_EVICTED);
+
+ V oldVal = recordable ? entry.rawGet() : null;
+
+ boolean hasVal = recordable && entry.hasValue();
+
+ boolean evicted = entry.evictInternal(cctx.isSwapOrOffheapEnabled(),
obsoleteVer, filter);
+
+ if (evicted) {
+ // Remove manually evicted entry from policy.
+ if (explicit && plcEnabled)
+ notifyPolicy(entry);
+
+ cache.removeEntry(entry);
+
++ if (cache.configuration().isStatisticsEnabled())
++ cache.metrics0().onEvict();
++
+ if (recordable)
+ cctx.events().addEvent(entry.partition(), entry.key(),
cctx.nodeId(), (IgniteUuid)null, null,
+ EVT_CACHE_ENTRY_EVICTED, null, false, oldVal, hasVal,
null, null, null);
+
+ if (log.isDebugEnabled())
+ log.debug("Entry was evicted [entry=" + entry + ",
localNode=" + cctx.nodeId() + ']');
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Entry was not evicted [entry=" + entry + ",
localNode=" + cctx.nodeId() + ']');
+
+ return evicted;
+ }
+
+ /**
+ * @param txEntry Transactional entry.
+ */
+ public void touch(IgniteTxEntry<K, V> txEntry, boolean loc) {
+ if (!plcEnabled && memoryMode != OFFHEAP_TIERED)
+ return;
+
+ if (!loc) {
+ if (cctx.isNear())
+ return;
+
+ if (evictSync)
+ return;
+ }
+
+ GridCacheEntryEx<K, V> e = txEntry.cached();
+
+ if (e.detached() || e.isInternal())
+ return;
+
+ try {
+ if (e.markObsoleteIfEmpty(null) || e.obsolete())
+ e.context().cache().removeEntry(e);
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to evict entry from cache: " + e, ex);
+ }
+
+ if (memoryMode == OFFHEAP_TIERED) {
+ try {
+ evict0(cctx.cache(), e, cctx.versions().next(), null, false);
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to evict entry from on heap memory: " +
e, ex);
+ }
+ }
+ else {
+ notifyPolicy(e);
+
+ if (evictSyncAgr)
+ waitForEvictionFutures();
+ }
+ }
+
+ /**
+ * @param e Entry for eviction policy notification.
+ * @param topVer Topology version.
+ */
+ public void touch(GridCacheEntryEx<K, V> e, long topVer) {
+ if (e.detached() || e.isInternal())
+ return;
+
+ try {
+ if (e.markObsoleteIfEmpty(null) || e.obsolete())
+ e.context().cache().removeEntry(e);
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to evict entry from cache: " + e, ex);
+ }
+
+ if (memoryMode == OFFHEAP_TIERED) {
+ try {
+ evict0(cctx.cache(), e, cctx.versions().next(), null, false);
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to evict entry from on heap memory: " +
e, ex);
+ }
+
+ return;
+ }
+
+ if (!plcEnabled)
+ return;
+
+ // Don't track non-primary entries if evicts are synchronized.
+ if (!cctx.isNear() && evictSync &&
!cctx.affinity().primary(cctx.localNode(), e.partition(), topVer))
+ return;
+
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ // Wait for futures to finish.
+ if (evictSyncAgr)
+ waitForEvictionFutures();
+
+ if (log.isDebugEnabled())
+ log.debug("Touching entry [entry=" + e + ", localNode=" +
cctx.nodeId() + ']');
+
+ notifyPolicy(e);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param e Entry for eviction policy notification.
+ */
+ private void touch0(GridCacheEntryEx<K, V> e) {
+ assert evictSyncAgr;
+ assert plcEnabled;
+
+ // Do not wait for futures here since only limited number
+ // of entries can be passed to this method.
+ notifyPolicy(e);
+ }
+
+ /**
+ * @param entries Entries for eviction policy notification.
+ */
+ private void touchOnTopologyChange(Iterable<? extends GridCacheEntryEx<K,
V>> entries) {
+ assert evictSync;
+ assert plcEnabled;
+
+ if (log.isDebugEnabled())
+ log.debug("Touching entries [entries=" + entries + ", localNode="
+ cctx.nodeId() + ']');
+
+ for (GridCacheEntryEx<K, V> e : entries) {
+ if (e.key() instanceof GridCacheInternal)
+ // Skip internal entry.
+ continue;
+
+ // Do not wait for futures here since only limited number
+ // of entries can be passed to this method.
+ notifyPolicy(e);
+ }
+ }
+
+ /**
+ * Warns on first eviction.
+ */
+ private void warnFirstEvict() {
+ // Do not move warning output to synchronized block (it causes
warning in IDE).
+ synchronized (this) {
+ if (firstEvictWarn)
+ return;
+
+ firstEvictWarn = true;
+ }
+
+ U.warn(log, "Evictions started (cache may have reached its
capacity)." +
+ " You may wish to increase 'maxSize' on eviction policy being
used for cache: " + cctx.name(),
+ "Evictions started (cache may have reached its capacity): " +
cctx.name());
+ }
+
+ /**
+ * @return {@code True} if either evicts or near evicts are synchronized,
{@code false} otherwise.
+ */
+ public boolean evictSyncOrNearSync() {
+ return evictSyncAgr;
+ }
+
+ /**
+ * @param entry Entry to attempt to evict.
+ * @param obsoleteVer Obsolete version.
+ * @param filter Optional entry filter.
+ * @param explicit {@code True} if evict is called explicitly, {@code
false} if it's called
+ * from eviction policy.
+ * @return {@code True} if entry was marked for eviction.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public boolean evict(@Nullable GridCacheEntryEx<K, V> entry, @Nullable
GridCacheVersion obsoleteVer,
+ boolean explicit, @Nullable IgnitePredicate<CacheEntry<K, V>>[]
filter) throws IgniteCheckedException {
+ if (entry == null)
+ return true;
+
+ // Do not evict internal entries.
+ if (entry.key() instanceof GridCacheInternal)
+ return false;
+
+ if (!cctx.isNear() && !explicit && !firstEvictWarn)
+ warnFirstEvict();
+
+ if (evictSyncAgr) {
+ assert !cctx.isNear(); // Make sure cache is not NEAR.
+
+ if (entry.wrap(false).backup() && evictSync)
+ // Do not track backups if evicts are synchronized.
+ return !explicit;
+
+ try {
+ if (!cctx.isAll(entry, filter))
+ return false;
+
+ if (entry.lockedByAny())
+ return false;
+
+ // Add entry to eviction queue.
+ enqueue(entry, filter);
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Entry got removed while evicting [entry=" +
entry +
+ ", localNode=" + cctx.nodeId() + ']');
+ }
+ }
+ else {
+ if (obsoleteVer == null)
+ obsoleteVer = cctx.versions().next();
+
+ // Do not touch entry if not evicted:
+ // 1. If it is call from policy, policy tracks it on its own.
+ // 2. If it is explicit call, entry is touched on tx commit.
+ return evict0(cctx.cache(), entry, obsoleteVer, filter, explicit);
+ }
+
+ return true;
+ }
+
+ /**
+ * @param keys Keys to evict.
+ * @param obsoleteVer Obsolete version.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void batchEvict(Collection<? extends K> keys, @Nullable
GridCacheVersion obsoleteVer) throws IgniteCheckedException {
+ assert !evictSyncAgr;
+ assert cctx.isSwapOrOffheapEnabled();
+
+ List<GridCacheEntryEx<K, V>> locked = new ArrayList<>(keys.size());
+
+ Collection<GridCacheBatchSwapEntry<K, V>> swapped = new
ArrayList<>(keys.size());
+
+ boolean recordable =
cctx.events().isRecordable(EVT_CACHE_ENTRY_EVICTED);
+
+ GridCacheAdapter<K, V> cache = cctx.cache();
+
+ Map<K, GridCacheEntryEx<K, V>> cached = U.newHashMap(keys.size());
+
+ // Get all participating entries to avoid deadlock.
+ for (K k : keys)
+ cached.put(k, cache.peekEx(k));
+
+ try {
+ for (K key : keys) {
+ GridCacheEntryEx<K, V> entry = cached.get(key);
+
+ // Do not evict internal entries.
+ if (entry == null || entry.key() instanceof GridCacheInternal)
+ continue;
+
+ // Lock entry.
+ unsafe.monitorEnter(entry);
+
+ locked.add(entry);
+
+ if (obsoleteVer == null)
+ obsoleteVer = cctx.versions().next();
+
+ GridCacheBatchSwapEntry<K, V> swapEntry =
entry.evictInBatchInternal(obsoleteVer);
+
+ if (swapEntry != null) {
+ swapped.add(swapEntry);
+
+ if (log.isDebugEnabled())
+ log.debug("Entry was evicted [entry=" + entry + ",
localNode=" + cctx.nodeId() + ']');
+ }
+ }
+
+ // Batch write to swap.
+ if (swapped != null)
+ cctx.swap().writeAll(swapped);
+ }
+ finally {
+ // Unlock entries in reverse order.
+ for (ListIterator<GridCacheEntryEx<K, V>> it =
locked.listIterator(locked.size()); it.hasPrevious();) {
+ GridCacheEntryEx<K, V> e = it.previous();
+
+ unsafe.monitorExit(e);
+ }
+
+ // Remove entries and fire events outside the locks.
+ for (GridCacheEntryEx<K, V> entry : locked) {
+ if (entry.obsolete()) {
+ entry.onMarkedObsolete();
+
+ cache.removeEntry(entry);
+
+ if (plcEnabled)
+ notifyPolicy(entry);
+
+ if (recordable)
+ cctx.events().addEvent(entry.partition(),
entry.key(), cctx.nodeId(), (IgniteUuid)null, null,
+ EVT_CACHE_ENTRY_EVICTED, null, false,
entry.rawGet(), entry.hasValue(), null, null, null);
+ }
+ }
+ }
+ }
+
+ /**
+ * Enqueues entry for synchronized eviction.
+ *
+ * @param entry Entry.
+ * @param filter Filter.
+ * @throws GridCacheEntryRemovedException If entry got removed.
+ */
+ private void enqueue(GridCacheEntryEx<K, V> entry,
IgnitePredicate<CacheEntry<K, V>>[] filter)
+ throws GridCacheEntryRemovedException {
+ Node<EvictionInfo> node = entry.meta(meta);
+
+ if (node == null) {
+ node = bufEvictQ.addLastx(new EvictionInfo(entry,
entry.version(), filter));
+
+ if (entry.putMetaIfAbsent(meta, node) != null)
+ // Was concurrently added, need to clear it from queue.
+ bufEvictQ.unlinkx(node);
+ else if (log.isDebugEnabled())
+ log.debug("Added entry to eviction queue: " + entry);
+ }
+ }
+
+ /**
+ * Checks eviction queue.
+ */
+ private void checkEvictionQueue() {
+ int maxSize = maxQueueSize();
+
+ int bufSize = bufEvictQ.sizex();
+
+ if (bufSize >= maxSize) {
+ if (log.isDebugEnabled())
+ log.debug("Processing eviction queue: " + bufSize);
+
+ Collection<EvictionInfo> evictInfos = new ArrayList<>(bufSize);
+
+ for (int i = 0; i < bufSize; i++) {
+ EvictionInfo info = bufEvictQ.poll();
+
+ if (info == null)
+ break;
+
+ evictInfos.add(info);
+ }
+
+ if (!evictInfos.isEmpty())
+ addToCurrentFuture(evictInfos);
+ }
+ }
+
+ /**
+ * @return Max queue size.
+ */
+ private int maxQueueSize() {
+ int size = (int)(cctx.cache().size() *
cctx.config().getEvictMaxOverflowRatio()) / 100;
+
+ if (size <= 0)
+ size = 500;
+
+ return Math.min(size,
cctx.config().getEvictSynchronizedKeyBufferSize());
+ }
+
+ /**
+ * Processes eviction queue (sends required requests, etc.).
+ *
+ * @param evictInfos Eviction information to create future with.
+ */
+ private void addToCurrentFuture(Collection<EvictionInfo> evictInfos) {
+ assert !evictInfos.isEmpty();
+
+ while (true) {
+ EvictionFuture fut = curEvictFut.get();
+
+ if (fut == null) {
+ curEvictFut.compareAndSet(null, new
EvictionFuture(cctx.kernalContext()));
+
+ continue;
+ }
+
+ if (fut.prepareLock()) {
+ boolean added;
+
+ try {
+ added = fut.add(evictInfos);
+ }
+ finally {
+ fut.prepareUnlock();
+ }
+
+ if (added) {
+ if (fut.prepare()) {
+ // Thread that prepares future should remove it and
install listener.
+ curEvictFut.compareAndSet(fut, null);
+
+ fut.listenAsync(new CI1<IgniteFuture<?>>() {
+ @Override public void apply(IgniteFuture<?> f) {
+ if (!busyLock.enterBusy()) {
+ if (log.isDebugEnabled())
+ log.debug("Will not notify eviction
future completion (grid is stopping): " +
+ f);
+
+ return;
+ }
+
+ try {
+ long topVer = lockTopology();
+
+ try {
+ onFutureCompleted((EvictionFuture)f,
topVer);
+ }
+ finally {
+ unlockTopology();
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ });
+ }
+
+ break;
+ }
+ else
+ // Infos were not added, create another future for next
iteration.
+ curEvictFut.compareAndSet(fut, new
EvictionFuture(cctx.kernalContext()));
+ }
+ else
+ // Future has not been locked, create another future for next
iteration.
+ curEvictFut.compareAndSet(fut, new
EvictionFuture(cctx.kernalContext()));
+ }
+ }
+
+ /**
+ * @param fut Completed eviction future.
+ * @param topVer Topology version on future complete.
+ */
+ private void onFutureCompleted(EvictionFuture fut, long topVer) {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ IgniteBiTuple<Collection<EvictionInfo>, Collection<EvictionInfo>>
t;
+
+ try {
+ t = fut.get();
+ }
+ catch (IgniteFutureCancelledException ignored) {
+ assert false : "Future has been cancelled, but manager is not
stopping: " + fut;
+
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Eviction future finished with error (all
entries will be touched): " + fut, e);
+
+ if (plcEnabled) {
+ for (EvictionInfo info : fut.entries())
+ touch0(info.entry());
+ }
+
+ return;
+ }
+
+ // Check if topology version is different.
+ if (fut.topologyVersion() != topVer) {
+ if (log.isDebugEnabled())
+ log.debug("Topology has changed, all entries will be
touched: " + fut);
+
+ if (plcEnabled) {
+ for (EvictionInfo info : fut.entries())
+ touch0(info.entry());
+ }
+
+ return;
+ }
+
+ // Evict remotely evicted entries.
+ GridCacheVersion obsoleteVer = null;
+
+ Collection<EvictionInfo> evictedEntries = t.get1();
+
+ for (EvictionInfo info : evictedEntries) {
+ GridCacheEntryEx<K, V> entry = info.entry();
+
+ try {
+ // Remove readers on which the entry was evicted.
+ for (IgniteBiTuple<ClusterNode, Long> r :
fut.evictedReaders(entry.key())) {
+ UUID readerId = r.get1().id();
+ Long msgId = r.get2();
+
+ ((GridDhtCacheEntry<K,
V>)entry).removeReader(readerId, msgId);
+ }
+
+ if (obsoleteVer == null)
+ obsoleteVer = cctx.versions().next();
+
+ // Do not touch primary entries, if not evicted.
+ // They will be touched within updating transactions.
+ evict0(cctx.cache(), entry, obsoleteVer,
versionFilter(info), false);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to evict entry [entry=" + entry +
+ ", localNode=" + cctx.nodeId() + ']', e);
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Entry was concurrently removed while
evicting [entry=" + entry +
+ ", localNode=" + cctx.nodeId() + ']');
+ }
+ }
+
+ Collection<EvictionInfo> rejectedEntries = t.get2();
+
+ // Touch remotely rejected entries (only if policy is enabled).
+ if (plcEnabled && !rejectedEntries.isEmpty()) {
+ for (EvictionInfo info : rejectedEntries)
+ touch0(info.entry());
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+
+ // Signal on future completion.
+ signal();
+ }
+ }
+
+ /**
+ * This method should be called when eviction future is processed
+ * and unwind may continue.
+ */
+ private void signal() {
+ futsCntLock.lock();
+
+ try {
+ // Avoid volatile read on assertion.
+ int cnt = --activeFutsCnt;
+
+ assert cnt >= 0 : "Invalid futures count: " + cnt;
+
+ if (cnt < maxActiveFuts)
+ futsCntCond.signalAll();
+ }
+ finally {
+ futsCntLock.unlock();
+ }
+ }
+
+ /**
+ * @param info Eviction info.
+ * @return Version aware filter.
+ */
+ private IgnitePredicate<CacheEntry<K, V>>[] versionFilter(final
EvictionInfo info) {
+ // If version has changed since we started the whole process
+ // then we should not evict entry.
+ return cctx.vararg(new P1<CacheEntry<K, V>>() {
+ @Override public boolean apply(CacheEntry<K, V> e) {
+ GridCacheVersion ver = (GridCacheVersion)e.version();
+
+ return info.version().equals(ver) && F.isAll(info.filter());
+ }
+ });
+ }
+
+ /**
+ * Gets a collection of nodes to send eviction requests to.
+ *
+ *
+ * @param entry Entry.
+ * @param topVer Topology version.
+ * @return Tuple of two collections: dht (in case of partitioned cache)
nodes
+ * and readers (empty for replicated cache).
+ * @throws GridCacheEntryRemovedException If entry got removed during
method
+ * execution.
+ */
+ @SuppressWarnings( {"IfMayBeConditional"})
+ private IgniteBiTuple<Collection<ClusterNode>, Collection<ClusterNode>>
remoteNodes(GridCacheEntryEx<K, V> entry,
+ long topVer)
+ throws GridCacheEntryRemovedException {
+ assert entry != null;
+
+ assert cctx.config().getCacheMode() != LOCAL;
+
+ Collection<ClusterNode> backups;
+
+ if (evictSync)
+ backups = F.view(cctx.dht().topology().nodes(entry.partition(),
topVer), F0.notEqualTo(cctx.localNode()));
+ else
+ backups = Collections.emptySet();
+
+ Collection<ClusterNode> readers;
+
+ if (nearSync) {
+ readers = F.transform(((GridDhtCacheEntry<K, V>)entry).readers(),
new C1<UUID, ClusterNode>() {
+ @Nullable @Override public ClusterNode apply(UUID nodeId) {
+ return cctx.node(nodeId);
+ }
+ });
+ }
+ else
+ readers = Collections.emptySet();
+
+ return new IgnitePair<>(backups, readers);
+ }
+
+ /**
+ * Notifications.
+ */
+ public void unwind() {
+ if (!evictSyncAgr)
+ return;
+
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ checkEvictionQueue();
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param e Entry to notify eviction policy.
+ */
+ @SuppressWarnings({"IfMayBeConditional", "RedundantIfStatement"})
+ private void notifyPolicy(GridCacheEntryEx<K, V> e) {
+ assert plcEnabled;
+ assert plc != null;
+ assert !e.isInternal() : "Invalid entry for policy notification: " +
e;
+
+ if (log.isDebugEnabled())
+ log.debug("Notifying eviction policy with entry: " + e);
+
+ if (filter == null || filter.evictAllowed(e.wrap(false)))
+ plc.onEntryAccessed(e.obsoleteOrDeleted(), e.evictWrap());
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("TooBroadScope")
+ private void waitForEvictionFutures() {
+ if (activeFutsCnt >= maxActiveFuts) {
+ boolean interrupted = false;
+
+ futsCntLock.lock();
+
+ try {
+ while(!stopping && activeFutsCnt >= maxActiveFuts) {
+ try {
+ futsCntCond.await(2000, MILLISECONDS);
+ }
+ catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+ }
+ finally {
+ futsCntLock.unlock();
+
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Prints out eviction stats.
+ */
+ public void printStats() {
+ X.println("Eviction stats [grid=" + cctx.gridName() + ", cache=" +
cctx.cache().name() +
+ ", buffEvictQ=" + bufEvictQ.sizex() + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ X.println(">>> ");
+ X.println(">>> Eviction manager memory stats [grid=" +
cctx.gridName() + ", cache=" + cctx.name() + ']');
+ X.println(">>> buffEvictQ size: " + bufEvictQ.sizex());
+ X.println(">>> futsSize: " + futs.size());
+ X.println(">>> futsCreated: " + idGen.get());
+ }
+
+ /**
+ *
+ */
+ private class BackupWorker extends GridWorker {
+ /** */
+ private final BlockingQueue<IgniteDiscoveryEvent> evts = new
LinkedBlockingQueue<>();
+
+ /** */
+ private final Collection<Integer> primaryParts = new HashSet<>();
+
+ /**
+ *
+ */
+ private BackupWorker() {
+ super(cctx.gridName(), "cache-eviction-backup-worker", log);
+
+ assert plcEnabled;
+ }
+
+ /**
+ * @param evt New event.
+ */
+ void addEvent(IgniteDiscoveryEvent evt) {
+ assert evt != null;
+
+ evts.add(evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException,
IgniteInterruptedException {
+ try {
+ assert !cctx.isNear() && evictSync;
+
+ ClusterNode loc = cctx.localNode();
+
+ // Initialize.
+
primaryParts.addAll(cctx.affinity().primaryPartitions(cctx.localNodeId(),
+ cctx.affinity().affinityTopologyVersion()));
+
+ while (!isCancelled()) {
+ IgniteDiscoveryEvent evt = evts.take();
+
+ if (log.isDebugEnabled())
+ log.debug("Processing event: " + evt);
+
+ // Remove partitions that are no longer primary.
+ for (Iterator<Integer> it = primaryParts.iterator();
it.hasNext();) {
+ if (!evts.isEmpty())
+ break;
+
+ if (!cctx.affinity().primary(loc, it.next(),
evt.topologyVersion()))
+ it.remove();
+ }
+
+ // Move on to next event.
+ if (!evts.isEmpty())
+ continue;
+
+ for (GridDhtLocalPartition<K, V> part :
cctx.topology().localPartitions()) {
+ if (!evts.isEmpty())
+ break;
+
+ if (part.primary(evt.topologyVersion()) &&
primaryParts.add(part.id())) {
+ if (log.isDebugEnabled())
+ log.debug("Touching partition entries: " +
part);
+
+ touchOnTopologyChange(part.entries());
+ }
+ }
+ }
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+ catch (IgniteException e) {
+ if (!e.hasCause(InterruptedException.class))
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Wrapper around an entry to be put into queue.
+ */
+ private class EvictionInfo {
+ /** Cache entry. */
+ private GridCacheEntryEx<K, V> entry;
+
+ /** Start version. */
+ private GridCacheVersion ver;
+
+ /** Filter to pass before entry will be evicted. */
+ private IgnitePredicate<CacheEntry<K, V>>[] filter;
+
+ /**
+ * @param entry Entry.
+ * @param ver Version.
+ * @param filter Filter.
+ */
+ EvictionInfo(GridCacheEntryEx<K, V> entry, GridCacheVersion ver,
+ IgnitePredicate<CacheEntry<K, V>>[] filter) {
+ assert entry != null;
+ assert ver != null;
+
+ this.entry = entry;
+ this.ver = ver;
+ this.filter = filter;
+ }
+
+ /**
+ * @return Entry.
+ */
+ GridCacheEntryEx<K, V> entry() {
+ return entry;
+ }
+
+ /**
+ * @return Version.
+ */
+ GridCacheVersion version() {
+ return ver;
+ }
+
+ /**
+ * @return Filter.
+ */
+ IgnitePredicate<CacheEntry<K, V>>[] filter() {
+ return filter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(EvictionInfo.class, this);
+ }
+ }
+
+ /**
+ * Future for synchronized eviction. Result is a tuple: {evicted entries,
rejected entries}.
+ */
+ private class EvictionFuture extends
GridFutureAdapter<IgniteBiTuple<Collection<EvictionInfo>,
+ Collection<EvictionInfo>>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long id = idGen.incrementAndGet();
+
+ /** */
+ private ConcurrentLinkedDeque8<EvictionInfo> evictInfos = new
ConcurrentLinkedDeque8<>();
+
+ /** */
+ private final ConcurrentMap<K, EvictionInfo> entries = new
ConcurrentHashMap8<>();
+
+ /** */
+ private final ConcurrentMap<K, Collection<ClusterNode>> readers =
+ new ConcurrentHashMap8<>();
+
+ /** */
+ private final Collection<EvictionInfo> evictedEntries = new
GridConcurrentHashSet<>();
+
+ /** */
+ private final ConcurrentMap<K, EvictionInfo> rejectedEntries = new
ConcurrentHashMap8<>();
+
+ /** Request map. */
+ private final ConcurrentMap<UUID, GridCacheEvictionRequest<K, V>>
reqMap =
+ new ConcurrentHashMap8<>();
+
+ /** Response map. */
+ private final ConcurrentMap<UUID, GridCacheEvictionResponse<K, V>>
resMap =
+ new ConcurrentHashMap8<>();
+
+ /** To make sure that future is completing within a single thread. */
+ private final AtomicBoolean finishPrepare = new AtomicBoolean();
+
+ /** Lock to use when future is being initialized. */
+ @GridToStringExclude
+ private final ReadWriteLock prepareLock = new
ReentrantReadWriteLock();
+
+ /** To make sure that future is completing within a single thread. */
+ private final AtomicBoolean completing = new AtomicBoolean();
+
+ /** Lock to use after future is initialized. */
+ @GridToStringExclude
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** Object to force future completion on elapsing eviction timeout. */
+ @GridToStringExclude
+ private GridTimeoutObject timeoutObj;
+
+ /** Topology version future is processed on. */
+ private long topVer;
+
+ /**
+ * @param ctx Context.
+ */
+ EvictionFuture(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /**
+ * Required by {@code Externalizable}.
+ */
+ public EvictionFuture() {
+ assert false : "This should never happen.";
+ }
+
+ /**
+ * @return {@code True} if prepare lock was acquired.
+ */
+ boolean prepareLock() {
+ return prepareLock.readLock().tryLock();
+ }
+
+ /**
+ *
+ */
+ void prepareUnlock() {
+ prepareLock.readLock().unlock();
+ }
+
+ /**
+ * @param infos Infos to add.
+ * @return {@code False} if entries were not added due to capacity
restrictions.
+ */
+ boolean add(Collection<EvictionInfo> infos) {
+ assert infos != null && !infos.isEmpty();
+
+ if (evictInfos.sizex() > maxQueueSize())
+ return false;
+
+ evictInfos.addAll(infos);
+
+ return true;
+ }
+
+ /**
+ * @return {@code True} if future has been prepared by this call.
+ */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+ boolean prepare() {
+ if (evictInfos.sizex() >= maxQueueSize() &&
finishPrepare.compareAndSet(false, true)) {
+ // Lock will never be released intentionally.
+ prepareLock.writeLock().lock();
+
+ futsCntLock.lock();
+
+ try {
+ activeFutsCnt++;
+ }
+ finally {
+ futsCntLock.unlock();
+ }
+
+ // Put future in map.
+ futs.put(id, this);
+
+ prepare0();
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Prepares future (sends all required requests).
+ */
+ private void prepare0() {
+ if (log.isDebugEnabled())
+ log.debug("Preparing eviction future [futId=" + id + ",
localNode=" + cctx.nodeId() +
+ ", infos=" + evictInfos + ']');
+
+ assert evictInfos != null && !evictInfos.isEmpty();
+
+ topVer = lockTopology();
+
+ try {
+ Collection<EvictionInfo> locals = null;
+
+ for (EvictionInfo info : evictInfos) {
+ // Queue node may have been stored in entry metadata
concurrently, but we don't care
+ // about it since we are currently processing this entry.
+ Node<EvictionInfo> queueNode =
info.entry().removeMeta(meta);
+
+ if (queueNode != null)
+ bufEvictQ.unlinkx(queueNode);
+
+ IgniteBiTuple<Collection<ClusterNode>,
Collection<ClusterNode>> tup;
+
+ try {
+ tup = remoteNodes(info.entry(), topVer);
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Entry got removed while preparing
eviction future (ignoring) [entry=" +
+ info.entry() + ", nodeId=" + cctx.nodeId() +
']');
+
+ continue;
+ }
+
+ Collection<ClusterNode> entryReaders =
+ F.addIfAbsent(readers, info.entry().key(), new
GridConcurrentHashSet<ClusterNode>());
+
+ assert entryReaders != null;
+
+ // Add entry readers so that we could remove them right
before local eviction.
+ entryReaders.addAll(tup.get2());
+
+ Collection<ClusterNode> nodes = F.concat(true,
tup.get1(), tup.get2());
+
+ if (!nodes.isEmpty()) {
+ entries.put(info.entry().key(), info);
+
+ // There are remote participants.
+ for (ClusterNode node : nodes) {
+ GridCacheEvictionRequest<K, V> req =
F.addIfAbsent(reqMap, node.id(),
+ new GridCacheEvictionRequest<K,
V>(cctx.cacheId(), id, evictInfos.size(), topVer));
+
+ assert req != null;
+
+ req.addKey(info.entry().key(), info.version(),
entryReaders.contains(node));
+ }
+ }
+ else {
+ if (locals == null)
+ locals = new HashSet<>(evictInfos.size(), 1.0f);
+
+ // There are no remote participants, need to keep the
entry as local.
+ locals.add(info);
+ }
+ }
+
+ if (locals != null) {
+ // Evict entries without remote participant nodes
immediately.
+ GridCacheVersion obsoleteVer = cctx.versions().next();
+
+ for (EvictionInfo info : locals) {
+ if (log.isDebugEnabled())
+ log.debug("Evicting key without remote
participant nodes: " + info);
+
+ try {
+ // Touch primary entry (without backup nodes) if
not evicted
+ // to keep tracking.
+ if (!evict0(cctx.cache(), info.entry(),
obsoleteVer, versionFilter(info), false) &&
+ plcEnabled)
+ touch0(info.entry());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to evict entry: " +
info.entry(), e);
+ }
+ }
+ }
+
+ // If there were only local entries.
+ if (entries.isEmpty()) {
+ complete(false);
+
+ return;
+ }
+ }
+ finally {
+ unlockTopology();
+ }
+
+ // Send eviction requests.
+ for (Map.Entry<UUID, GridCacheEvictionRequest<K, V>> e :
reqMap.entrySet()) {
+ UUID nodeId = e.getKey();
+
+ GridCacheEvictionRequest<K, V> req = e.getValue();
+
+ if (log.isDebugEnabled())
+ log.debug("Sending eviction request [node=" + nodeId + ",
req=" + req + ']');
+
+ try {
+ cctx.io().send(nodeId, req);
+ }
+ catch (ClusterTopologyException ignored) {
+ // Node left the topology.
+ onNodeLeft(nodeId);
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to send eviction request to node
[node=" + nodeId + ", req=" + req + ']', ex);
+
+ rejectEntries(nodeId);
+ }
+ }
+
+ registerTimeoutObject();
+ }
+
+ /**
+ *
+ */
+ private void registerTimeoutObject() {
+ // Check whether future has not been completed yet.
+ if (lock.readLock().tryLock()) {
+ try {
+ timeoutObj = new
GridTimeoutObjectAdapter(cctx.config().getEvictSynchronizedTimeout()) {
+ @Override public void onTimeout() {
+ complete(true);
+ }
+ };
+
+ cctx.time().addTimeoutObject(timeoutObj);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+ }
+
+ /**
+ * @return Future ID.
+ */
+ long id() {
+ return id;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ long topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Keys to readers mapping.
+ */
+ Map<K, Collection<ClusterNode>> readers() {
+ return readers;
+ }
+
+ /**
+ * @return All entries associated with future that should be evicted
(or rejected).
+ */
+ Collection<EvictionInfo> entries() {
+ return entries.values();
+ }
+
+ /**
+ * Reject all entries on behalf of specified node.
+ *
+ * @param nodeId Node ID.
+ */
+ private void rejectEntries(UUID nodeId) {
+ assert nodeId != null;
+
+ if (lock.readLock().tryLock()) {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Rejecting entries for node: " + nodeId);
+
+ GridCacheEvictionRequest<K, V> req =
reqMap.remove(nodeId);
+
+ for (GridTuple3<K, GridCacheVersion, Boolean> t :
req.entries()) {
+ EvictionInfo info = entries.get(t.get1());
+
+ assert info != null;
+
+ rejectedEntries.put(t.get1(), info);
+ }
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ checkDone();
+ }
+
+ /**
+ * @param nodeId Node id that left the topology.
+ */
+ void onNodeLeft(UUID nodeId) {
+ assert nodeId != null;
+
+ if (lock.readLock().tryLock()) {
+ try {
+ // Stop waiting response from this node.
+ reqMap.remove(nodeId);
+
+ resMap.remove(nodeId);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+
+ checkDone();
+ }
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param res Response.
+ */
+ void onResponse(UUID nodeId, GridCacheEvictionResponse<K, V> res) {
+ assert nodeId != null;
+ assert res != null;
+
+ if (lock.readLock().tryLock()) {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Entered to eviction future onResponse()
[fut=" + this + ", node=" + nodeId +
+ ", res=" + res + ']');
+
+ ClusterNode node = cctx.node(nodeId);
+
+ if (node != null)
+ resMap.put(nodeId, res);
+ else
+ // Sender node left grid.
+ reqMap.remove(nodeId);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+
+ if (res.error())
+ // Complete future, since there was a class loading error
on at least one node.
+ complete(false);
+ else
+ checkDone();
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Ignored eviction response [fut=" + this + ",
node=" + nodeId + ", res=" + res + ']');
+ }
+ }
+
+ /**
+ *
+ */
+ private void checkDone() {
+ if (reqMap.isEmpty() ||
resMap.keySet().containsAll(reqMap.keySet()))
+ complete(false);
+ }
+
+ /**
+ * Completes future.
+ *
+ * @param timedOut {@code True} if future is being forcibly completed
on timeout.
+ */
+ @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+ private void complete(boolean timedOut) {
+ if (completing.compareAndSet(false, true)) {
+ // Lock will never be released intentionally.
+ lock.writeLock().lock();
+
+ futs.remove(id);
+
+ if (timeoutObj != null && !timedOut)
+ // If future is timed out, corresponding object is
already removed.
+ cctx.time().removeTimeoutObject(timeoutObj);
+
+ if (log.isDebugEnabled())
+ log.debug("Building eviction future result [fut=" + this
+ ", timedOut=" + timedOut + ']');
+
+ boolean err = F.forAny(resMap.values(), new
P1<GridCacheEvictionResponse<K, V>>() {
+ @Override public boolean
apply(GridCacheEvictionResponse<K, V> res) {
+ return res.error();
+ }
+ });
+
+ if (err) {
+ Collection<UUID> ids = F.view(resMap.keySet(), new
P1<UUID>() {
+ @Override public boolean apply(UUID e) {
+ return resMap.get(e).error();
+ }
+ });
+
+ assert !ids.isEmpty();
+
+ U.warn(log, "Remote node(s) failed to process eviction
request " +
+ "due to topology changes " +
+ "(some backup or remote values maybe lost): " + ids);
+ }
+
+ if (timedOut)
+ U.warn(log, "Timed out waiting for eviction future " +
+ "(consider changing 'evictSynchronousTimeout' and
'evictSynchronousConcurrencyLevel' " +
+ "configuration parameters).");
+
+ if (err || timedOut) {
+ // Future has not been completed successfully, all
entries should be rejected.
+ assert evictedEntries.isEmpty();
+
+ rejectedEntries.putAll(entries);
+ }
+ else {
+ // Copy map to filter remotely rejected entries,
+ // as they will be touched within corresponding txs.
+ Map<K, EvictionInfo> rejectedEntries0 = new
HashMap<>(rejectedEntries);
+
+ // Future has been completed successfully - build result.
+ for (EvictionInfo info : entries.values()) {
+ K key = info.entry().key();
+
+ if (rejectedEntries0.containsKey(key))
+ // Was already rejected.
+ continue;
+
+ boolean rejected = false;
+
+ for (GridCacheEvictionResponse<K, V> res :
resMap.values()) {
+ if (res.rejectedKeys().contains(key)) {
+ // Modify copied map.
+ rejectedEntries0.put(key, info);
+
+ rejected = true;
+
+ break;
+ }
+ }
+
+ if (!rejected)
+ evictedEntries.add(info);
+ }
+ }
+
+ // Pass entries that were rejected due to topology changes
+ // or due to timeout or class loading issues.
+ // Remotely rejected entries will be touched within
corresponding txs.
+ onDone(F.t(evictedEntries, rejectedEntries.values()));
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @return Reader nodes on which given key was evicted.
+ */
+ Collection<IgniteBiTuple<ClusterNode, Long>> evictedReaders(K key) {
+ Collection<ClusterNode> mappedReaders = readers.get(key);
+
+ if (mappedReaders == null)
+ return Collections.emptyList();
+
+ Collection<IgniteBiTuple<ClusterNode, Long>> col = new
LinkedList<>();
+
+ for (Map.Entry<UUID, GridCacheEvictionResponse<K, V>> e :
resMap.entrySet()) {
+ ClusterNode node = cctx.node(e.getKey());
+
+ // If node has left or response did not arrive from near node
+ // then just skip it.
+ if (node == null || !mappedReaders.contains(node))
+ continue;
+
+ GridCacheEvictionResponse<K, V> res = e.getValue();
+
+ if (!res.rejectedKeys().contains(key))
+ col.add(F.t(node, res.messageId()));
+ }
+
+ return col;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+ @Override public boolean cancel() {
+ if (completing.compareAndSet(false, true)) {
+ // Lock will never be released intentionally.
+ lock.writeLock().lock();
+
+ if (timeoutObj != null)
+ cctx.time().removeTimeoutObject(timeoutObj);
+
+ boolean b = onCancelled();
+
+ assert b;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(EvictionFuture.class, this);
+ }
+ }
+ }