http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 0000000,8a69d1b..b76735a mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@@ -1,0 -1,1149 +1,1152 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.rest.handlers.cache; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.cache.datastructures.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.resources.*; + import org.apache.ignite.transactions.*; + import org.apache.ignite.internal.processors.license.*; + import org.apache.ignite.internal.processors.rest.*; + import org.apache.ignite.internal.processors.rest.handlers.*; + import org.apache.ignite.internal.processors.rest.request.*; + import org.apache.ignite.internal.processors.task.*; + import org.apache.ignite.internal.util.future.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import javax.cache.expiry.*; + import java.io.*; + import java.util.*; + import java.util.concurrent.*; + + import static java.util.concurrent.TimeUnit.*; + import static org.apache.ignite.transactions.IgniteTxConcurrency.*; + import static org.apache.ignite.transactions.IgniteTxIsolation.*; + import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; + import static org.apache.ignite.internal.processors.license.GridLicenseSubsystem.*; + + /** + * Command handler for API requests. + */ + public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { + /** Supported commands. */ + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList( + CACHE_GET, + CACHE_GET_ALL, + CACHE_PUT, + CACHE_ADD, + CACHE_PUT_ALL, + CACHE_REMOVE, + CACHE_REMOVE_ALL, + CACHE_REPLACE, + CACHE_INCREMENT, + CACHE_DECREMENT, + CACHE_CAS, + CACHE_APPEND, + CACHE_PREPEND, + CACHE_METRICS + ); + + /** Requests with required parameter {@code key}. */ + private static final EnumSet<GridRestCommand> KEY_REQUIRED_REQUESTS = EnumSet.of( + CACHE_GET, + CACHE_PUT, + CACHE_ADD, + CACHE_REMOVE, + CACHE_REPLACE, + CACHE_INCREMENT, + CACHE_DECREMENT, + CACHE_CAS, + CACHE_APPEND, + CACHE_PREPEND + ); + + /** */ + private static final CacheFlag[] EMPTY_FLAGS = new CacheFlag[0]; + + /** + * @param ctx Context. + */ + public GridCacheCommandHandler(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** + * Retrieves cache flags from corresponding bits. + * + * @param cacheFlagsBits Integer representation of cache flags bit set. + * @return Array of cache flags. + */ + public static CacheFlag[] parseCacheFlags(int cacheFlagsBits) { + if (cacheFlagsBits == 0) + return EMPTY_FLAGS; + + EnumSet<CacheFlag> flagSet = EnumSet.noneOf(CacheFlag.class); + + if ((cacheFlagsBits & 1) != 0) + flagSet.add(CacheFlag.SKIP_STORE); + + if ((cacheFlagsBits & (1 << 1)) != 0) + flagSet.add(CacheFlag.SKIP_SWAP); + + if ((cacheFlagsBits & (1 << 2)) != 0) + flagSet.add(CacheFlag.SYNC_COMMIT); + + if ((cacheFlagsBits & (1 << 4)) != 0) + flagSet.add(CacheFlag.INVALIDATE); + + return flagSet.toArray(new CacheFlag[flagSet.size()]); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<GridRestResponse> handleAsync(final GridRestRequest req) { + assert req instanceof GridRestCacheRequest : "Invalid command for topology handler: " + req; + + assert SUPPORTED_COMMANDS.contains(req.command()); + + GridLicenseUseRegistry.onUsage(DATA_GRID, getClass()); + + if (log.isDebugEnabled()) + log.debug("Handling cache REST request: " + req); + + GridRestCacheRequest req0 = (GridRestCacheRequest)req; + + final String cacheName = req0.cacheName(); + + final Object key = req0.key(); + + final CacheFlag[] flags = parseCacheFlags(req0.cacheFlags()); + + try { + GridRestCommand cmd = req0.command(); + + if (key == null && KEY_REQUIRED_REQUESTS.contains(cmd)) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("key")); + + final Long ttl = req0.ttl(); + + IgniteFuture<GridRestResponse> fut; + + switch (cmd) { + case CACHE_GET: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new GetCommand(key), req.portableMode()); + + break; + } + + case CACHE_GET_ALL: { + Set<Object> keys = req0.values().keySet(); + + if (F.isEmpty(keys)) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("keys")); + + // HashSet wrapping for correct serialization + keys = new HashSet<>(keys); + + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new GetAllCommand(keys), req.portableMode()); + + break; + } + + case CACHE_PUT: { + final Object val = req0.value(); + + if (val == null) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); + + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, new + PutCommand(key, ttl, val), req.portableMode()); + + break; + } + + case CACHE_ADD: { + final Object val = req0.value(); + + if (val == null) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); + + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new AddCommand(key, ttl, val), req.portableMode()); + + break; + } + + case CACHE_PUT_ALL: { + Map<Object, Object> map = req0.values(); + + if (F.isEmpty(map)) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("values")); + + for (Map.Entry<Object, Object> e : map.entrySet()) { + if (e.getKey() == null) + throw new IgniteCheckedException("Failing putAll operation (null keys are not allowed)."); + + if (e.getValue() == null) + throw new IgniteCheckedException("Failing putAll operation (null values are not allowed)."); + } + + // HashMap wrapping for correct serialization + map = new HashMap<>(map); + + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new PutAllCommand(map), req.portableMode()); + + break; + } + + case CACHE_REMOVE: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new RemoveCommand(key), req.portableMode()); + + break; + } + + case CACHE_REMOVE_ALL: { + Map<Object, Object> map = req0.values(); + + // HashSet wrapping for correct serialization + Set<Object> keys = map == null ? null : new HashSet<>(map.keySet()); + + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new RemoveAllCommand(keys), req.portableMode()); + + break; + } + + case CACHE_REPLACE: { + final Object val = req0.value(); + + if (val == null) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); + + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new ReplaceCommand(key, ttl, val), req.portableMode()); + + break; + } + + case CACHE_INCREMENT: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, key, + new IncrementCommand(key, req0)); + + break; + } + + case CACHE_DECREMENT: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, key, + new DecrementCommand(key, req0)); + + break; + } + + case CACHE_CAS: { + final Object val1 = req0.value(); + final Object val2 = req0.value2(); + + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new CasCommand(val2, val1, key), req.portableMode()); + + break; + } + + case CACHE_APPEND: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new AppendCommand(key, req0), req.portableMode()); + + break; + } + + case CACHE_PREPEND: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, flags, key, + new PrependCommand(key, req0), req.portableMode()); + + break; + } + + case CACHE_METRICS: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, key, new MetricsCommand()); + + break; + } + + default: + throw new IllegalArgumentException("Invalid command for cache handler: " + req); + } + + return fut; + } + catch (IgniteException e) { + U.error(log, "Failed to execute cache command: " + req, e); + + return new GridFinishedFuture<>(ctx, e); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to execute cache command: " + req, e); + + return new GridFinishedFuture<>(ctx, e); + } + finally { + if (log.isDebugEnabled()) + log.debug("Handled cache REST request: " + req); + } + } + + /** + * Executes command on flagged cache projection. Checks {@code destId} to find + * if command could be performed locally or routed to a remote node. + * + * @param destId Target node Id for the operation. + * If {@code null} - operation could be executed anywhere. + * @param clientId Client ID. + * @param cacheName Cache name. + * @param flags Cache flags. + * @param key Key to set affinity mapping in the response. + * @param op Operation to perform. + * @param keepPortable Keep portable flag. + * @return Operation result in future. + * @throws IgniteCheckedException If failed + */ + private IgniteFuture<GridRestResponse> executeCommand( + @Nullable UUID destId, + UUID clientId, + final String cacheName, + final CacheFlag[] flags, + final Object key, + final CacheProjectionCommand op, + final boolean keepPortable) throws IgniteCheckedException { + + final boolean locExec = + destId == null || destId.equals(ctx.localNodeId()) || replicatedCacheAvailable(cacheName); + + if (locExec) { + CacheProjection<?,?> prj = localCache(cacheName).forSubjectId(clientId).flagsOn(flags); + + if (keepPortable) + prj = prj.keepPortable(); + + return op.apply((CacheProjection<Object, Object>)prj, ctx). + chain(resultWrapper((CacheProjection<Object, Object>)prj, key)); + } + else { + ClusterGroup prj = ctx.grid().forPredicate(F.nodeForNodeId(destId)); + + IgniteCompute comp = ctx.grid().compute(prj).withNoFailover().enableAsync(); + + comp.call(new FlaggedCacheOperationCallable(clientId, cacheName, flags, op, key, keepPortable)); + + return comp.future(); + } + } + + /** + * Executes command on cache. Checks {@code destId} to find + * if command could be performed locally or routed to a remote node. + * + * @param destId Target node Id for the operation. + * If {@code null} - operation could be executed anywhere. + * @param clientId Client ID. + * @param cacheName Cache name. + * @param key Key to set affinity mapping in the response. + * @param op Operation to perform. + * @return Operation result in future. + * @throws IgniteCheckedException If failed + */ + private IgniteFuture<GridRestResponse> executeCommand( + @Nullable UUID destId, + UUID clientId, + final String cacheName, + final Object key, + final CacheCommand op) throws IgniteCheckedException { + final boolean locExec = destId == null || destId.equals(ctx.localNodeId()) || + ctx.cache().cache(cacheName) != null; + + if (locExec) { + final CacheProjection<Object, Object> cache = localCache(cacheName).forSubjectId(clientId); + + return op.apply(cache, ctx).chain(resultWrapper(cache, key)); + } + else { + ClusterGroup prj = ctx.grid().forPredicate(F.nodeForNodeId(destId)); + + IgniteCompute comp = ctx.grid().compute(prj).withNoFailover().enableAsync(); + + comp.call(new CacheOperationCallable(clientId, cacheName, op, key)); + + return comp.future(); + } + } + + /** + * Handles increment and decrement commands. + * + * @param cache Cache. + * @param key Key. + * @param req Request. + * @param decr Whether to decrement (increment otherwise). + * @return Future of operation result. + * @throws IgniteCheckedException In case of error. + */ + private static IgniteFuture<?> incrementOrDecrement(CacheProjection<Object, Object> cache, String key, + GridRestCacheRequest req, final boolean decr) throws IgniteCheckedException { + assert cache != null; + assert key != null; + assert req != null; + + Long init = req.initial(); + Long delta = req.delta(); + + if (delta == null) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("delta")); + + final CacheAtomicLong l = cache.cache().dataStructures().atomicLong(key, init != null ? init : 0, true); + + final Long d = delta; + + return ((GridKernal)cache.gridProjection().ignite()).context().closure().callLocalSafe(new Callable<Object>() { + @Override public Object call() throws Exception { + return l.addAndGet(decr ? -d : d); + } + }, false); + } + + /** + * Handles append and prepend commands. + * + * @param ctx Kernal context. + * @param cache Cache. + * @param key Key. + * @param req Request. + * @param prepend Whether to prepend. + * @return Future of operation result. + * @throws IgniteCheckedException In case of any exception. + */ + private static IgniteFuture<?> appendOrPrepend( + final GridKernalContext ctx, + final CacheProjection<Object, Object> cache, + final Object key, GridRestCacheRequest req, final boolean prepend) throws IgniteCheckedException { + assert cache != null; + assert key != null; + assert req != null; + + final Object val = req.value(); + + if (val == null) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); + + return ctx.closure().callLocalSafe(new Callable<Object>() { + @Override public Object call() throws Exception { + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Object curVal = cache.get(key); + + if (curVal == null) + return false; + + // Modify current value with appendix one. + Object newVal = appendOrPrepend(curVal, val, !prepend); + + // Put new value asynchronously. + cache.putx(key, newVal); + + tx.commit(); + } + + return true; + } + }, false); + } + + /** + * Append or prepend new value to the current one. + * + * @param origVal Original value. + * @param appendVal Appendix value to add to the original one. + * @param appendPlc Append or prepend policy flag. + * @return Resulting value. + * @throws IgniteCheckedException In case of grid exceptions. + */ + private static Object appendOrPrepend(Object origVal, Object appendVal, boolean appendPlc) throws IgniteCheckedException { + // Strings. + if (appendVal instanceof String && origVal instanceof String) + return appendPlc ? origVal + (String)appendVal : (String)appendVal + origVal; + + // Maps. + if (appendVal instanceof Map && origVal instanceof Map) { + Map<Object, Object> origMap = (Map<Object, Object>)origVal; + Map<Object, Object> appendMap = (Map<Object, Object>)appendVal; + + Map<Object, Object> map = X.cloneObject(origMap, false, true); + + if (appendPlc) + map.putAll(appendMap); // Append. + else { + map.clear(); + map.putAll(appendMap); // Prepend. + map.putAll(origMap); + } + + for (Map.Entry<Object, Object> e : appendMap.entrySet()) // Remove zero-valued entries. + if (e.getValue() == null && map.get(e.getKey()) == null) + map.remove(e.getKey()); + + return map; + } + + // Generic collection. + if (appendVal instanceof Collection<?> && origVal instanceof Collection<?>) { + Collection<Object> origCol = (Collection<Object>)origVal; + Collection<Object> appendCol = (Collection<Object>)appendVal; + + Collection<Object> col = X.cloneObject(origCol, false, true); + + if (appendPlc) + col.addAll(appendCol); // Append. + else { + col.clear(); + col.addAll(appendCol); // Prepend. + col.addAll(origCol); + } + + return col; + } + + throw new IgniteCheckedException("Incompatible types [appendVal=" + appendVal + ", old=" + origVal + ']'); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheCommandHandler.class, this); + } + + /** + * Creates a transformation function from {@link CacheCommand}'s results into {@link GridRestResponse}. + * + * @param c Cache instance to obtain affinity data. + * @param key Affinity key for previous operation. + * @return Rest response. + */ + private static IgniteClosure<IgniteFuture<?>, GridRestResponse> resultWrapper( + final CacheProjection<Object, Object> c, @Nullable final Object key) { + return new CX1<IgniteFuture<?>, GridRestResponse>() { + @Override public GridRestResponse applyx(IgniteFuture<?> f) throws IgniteCheckedException { + GridCacheRestResponse resp = new GridCacheRestResponse(); + + resp.setResponse(f.get()); + + if (key != null) + resp.setAffinityNodeId(c.cache().affinity().mapKeyToNode(key).id().toString()); + + return resp; + } + }; + } + + /** + * @param cacheName Cache name. + * @return If replicated cache with given name is locally available. + */ + private boolean replicatedCacheAvailable(String cacheName) { + GridCacheAdapter<Object,Object> cache = ctx.cache().internalCache(cacheName); + + return cache != null && cache.configuration().getCacheMode() == CacheMode.REPLICATED; + } + + /** + * Used for test purposes. + * + * @param cacheName Name of the cache. + * @return Instance on the named cache. + * @throws IgniteCheckedException If cache not found. + */ + protected GridCacheProjectionEx<Object, Object> localCache(String cacheName) throws IgniteCheckedException { + GridCacheProjectionEx<Object, Object> cache = (GridCacheProjectionEx<Object, Object>)ctx.cache().cache(cacheName); + + if (cache == null) + throw new IgniteCheckedException( + "Failed to find cache for given cache name (null for default cache): " + cacheName); + + return cache; + } + + /** + * @param ignite Grid instance. + * @param cacheName Name of the cache. + * @return Instance on the named cache. + * @throws IgniteCheckedException If cache not found. + */ + private static GridCacheProjectionEx<Object, Object> cache(Ignite ignite, String cacheName) throws IgniteCheckedException { + GridCache<Object, Object> cache = ignite.cache(cacheName); + + if (cache == null) + throw new IgniteCheckedException( + "Failed to find cache for given cache name (null for default cache): " + cacheName); + + return (GridCacheProjectionEx<Object, Object>)cache; + } + + /** + * Fixed result closure. + */ + private static final class FixedResult extends CX1<IgniteFuture<?>, Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** Closure result. */ + private final Object res; + + /** + * @param res Closure result. + */ + private FixedResult(Object res) { + this.res = res; + } + + /** {@inheritDoc} */ + @Override public Object applyx(IgniteFuture<?> f) throws IgniteCheckedException { + f.get(); + + return res; + } + } + + /** + * Type alias. + */ + private abstract static class CacheCommand + extends IgniteClosure2X<CacheProjection<Object, Object>, GridKernalContext, IgniteFuture<?>> { + /** */ + private static final long serialVersionUID = 0L; + + // No-op. + } + + /** + * Type alias. + */ + private abstract static class CacheProjectionCommand + extends IgniteClosure2X<CacheProjection<Object, Object>, GridKernalContext, IgniteFuture<?>> { + /** */ + private static final long serialVersionUID = 0L; + + // No-op. + } + + /** + * Class for flagged cache operations. + */ + @GridInternal + private static class FlaggedCacheOperationCallable implements Callable<GridRestResponse>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Client ID. */ + private UUID clientId; + + /** */ + private final String cacheName; + + /** */ + private final CacheFlag[] flags; + + /** */ + private final CacheProjectionCommand op; + + /** */ + private final Object key; + + /** */ + private final boolean keepPortable; + + /** */ + @IgniteInstanceResource + private Ignite g; + + /** + * @param clientId Client ID. + * @param cacheName Cache name. + * @param flags Flags. + * @param op Operation. + * @param key Key. + * @param keepPortable Keep portable flag. + */ + private FlaggedCacheOperationCallable(UUID clientId, String cacheName, CacheFlag[] flags, + CacheProjectionCommand op, Object key, boolean keepPortable) { + this.clientId = clientId; + this.cacheName = cacheName; + this.flags = flags; + this.op = op; + this.key = key; + this.keepPortable = keepPortable; + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + CacheProjection<?, ?> prj = cache(g, cacheName).forSubjectId(clientId).flagsOn(flags); + + if (keepPortable) + prj = prj.keepPortable(); + + // Need to apply both operation and response transformation remotely + // as cache could be inaccessible on local node and + // exception processing should be consistent with local execution. + return op.apply((CacheProjection<Object, Object>)prj, ((GridKernal)g).context()). + chain(resultWrapper((CacheProjection<Object, Object>)prj, key)).get(); + } + } + + /** + * Class for cache operations. + */ + @GridInternal + private static class CacheOperationCallable implements Callable<GridRestResponse>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Client ID. */ + private UUID clientId; + + /** */ + private final String cacheName; + + /** */ + private final CacheCommand op; + + /** */ + private final Object key; + + /** */ + @IgniteInstanceResource + private Ignite g; + + /** + * @param clientId Client ID. + * @param cacheName Cache name. + * @param op Operation. + * @param key Key. + */ + private CacheOperationCallable(UUID clientId, String cacheName, CacheCommand op, Object key) { + this.clientId = clientId; + this.cacheName = cacheName; + this.op = op; + this.key = key; + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + final CacheProjection<Object, Object> cache = cache(g, cacheName).forSubjectId(clientId); + + // Need to apply both operation and response transformation remotely + // as cache could be inaccessible on local node and + // exception processing should be consistent with local execution. + return op.apply(cache, ((GridKernal)g).context()).chain(resultWrapper(cache, key)).get(); + } + } + + /** */ + private static class GetCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** + * @param key Key. + */ + GetCommand(Object key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + return c.getAsync(key); + } + } + + /** */ + private static class GetAllCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Collection<Object> keys; + + /** + * @param keys Keys. + */ + GetAllCommand(Collection<Object> keys) { + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + return c.getAllAsync(keys); + } + } + + /** */ + private static class PutAllCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Map<Object, Object> map; + + /** + * @param map Objects to put. + */ + PutAllCommand(Map<Object, Object> map) { + this.map = map; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + return c.putAllAsync(map).chain(new FixedResult(true)); + } + } + + /** */ + private static class RemoveCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** + * @param key Key. + */ + RemoveCommand(Object key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + return c.removexAsync(key); + } + } + + /** */ + private static class RemoveAllCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Collection<Object> keys; + + /** + * @param keys Keys to remove. + */ + RemoveAllCommand(Collection<Object> keys) { + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + return (F.isEmpty(keys) ? c.removeAllAsync() : c.removeAllAsync(keys)) + .chain(new FixedResult(true)); + } + } + + /** */ + private static class CasCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object exp; + + /** */ + private final Object val; + + /** */ + private final Object key; + + /** + * @param exp Expected previous value. + * @param val New value. + * @param key Key. + */ + CasCommand(Object exp, Object val, Object key) { + this.val = val; + this.exp = exp; + this.key = key; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + return exp == null && val == null ? c.removexAsync(key) : + exp == null ? c.putxIfAbsentAsync(key, val) : + val == null ? c.removeAsync(key, exp) : + c.replaceAsync(key, exp, val); + } + } + + /** */ + private static class PutCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** */ + private final Long ttl; + + /** */ + private final Object val; + + /** + * @param key Key. + * @param ttl TTL. + * @param val Value. + */ + PutCommand(Object key, Long ttl, Object val) { + this.key = key; + this.ttl = ttl; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + if (ttl != null && ttl > 0) { + Duration duration = new Duration(MILLISECONDS, ttl); + + c = ((GridCacheProjectionEx<Object, Object>)c).withExpiryPolicy(new ModifiedExpiryPolicy(duration)); + } + + return c.putxAsync(key, val); + } + } + + /** */ + private static class AddCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** */ + private final Long ttl; + + /** */ + private final Object val; + + /** + * @param key Key. + * @param ttl TTL. + * @param val Value. + */ + AddCommand(Object key, Long ttl, Object val) { + this.key = key; + this.ttl = ttl; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + if (ttl != null && ttl > 0) { + Duration duration = new Duration(MILLISECONDS, ttl); + + c = ((GridCacheProjectionEx<Object, Object>)c).withExpiryPolicy(new ModifiedExpiryPolicy(duration)); + } + + return c.putxIfAbsentAsync(key, val); + } + } + + /** */ + private static class ReplaceCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** */ + private final Long ttl; + + /** */ + private final Object val; + + /** + * @param key Key. + * @param ttl TTL. + * @param val Value. + */ + ReplaceCommand(Object key, Long ttl, Object val) { + this.key = key; + this.ttl = ttl; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + if (ttl != null && ttl > 0) { + Duration duration = new Duration(MILLISECONDS, ttl); + + c = ((GridCacheProjectionEx<Object, Object>)c).withExpiryPolicy(new ModifiedExpiryPolicy(duration)); + } + + return c.replacexAsync(key, val); + } + } + + /** */ + private static class IncrementCommand extends CacheCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** */ + private final GridRestCacheRequest req; + + /** + * @param key Key. + * @param req Operation request. + */ + IncrementCommand(Object key, GridRestCacheRequest req) { + this.key = key; + this.req = req; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) + throws IgniteCheckedException { + return incrementOrDecrement(c, (String)key, req, false); + } + } + + /** */ + private static class DecrementCommand extends CacheCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** */ + private final GridRestCacheRequest req; + + /** + * @param key Key. + * @param req Operation request. + */ + DecrementCommand(Object key, GridRestCacheRequest req) { + this.key = key; + this.req = req; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) throws IgniteCheckedException { + return incrementOrDecrement(c, (String)key, req, true); + } + } + + /** */ + private static class AppendCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** */ + private final GridRestCacheRequest req; + + /** + * @param key Key. + * @param req Operation request. + */ + AppendCommand(Object key, GridRestCacheRequest req) { + this.key = key; + this.req = req; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) + throws IgniteCheckedException { + return appendOrPrepend(ctx, c, key, req, false); + } + } + + /** */ + private static class PrependCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** */ + private final GridRestCacheRequest req; + + /** + * @param key Key. + * @param req Operation request. + */ + PrependCommand(Object key, GridRestCacheRequest req) { + this.key = key; + this.req = req; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) + throws IgniteCheckedException { + return appendOrPrepend(ctx, c, key, req, true); + } + } + + /** */ + private static class MetricsCommand extends CacheCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) { + CacheMetrics metrics = c.cache().metrics(); + + assert metrics != null; + + return new GridFinishedFuture<Object>(ctx, new GridCacheRestMetrics( - metrics.createTime(), metrics.readTime(), metrics.writeTime(), - metrics.reads(), metrics.writes(), metrics.hits(), metrics.misses())); ++ (int)metrics.getCacheGets(), ++ (int)(metrics.getCacheRemovals() + metrics.getCachePuts()), ++ (int)metrics.getCacheHits(), ++ (int)metrics.getCacheMisses()) ++ ); + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheRestMetrics.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheRestMetrics.java index 0000000,f7d45c0..d723800 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheRestMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheRestMetrics.java @@@ -1,0 -1,215 +1,142 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.rest.handlers.cache; + + import org.apache.ignite.internal.util.*; + + import java.util.*; + + /** + * Grid cache metrics for rest. + */ + public class GridCacheRestMetrics { - /** Create time. */ - private long createTime; - - /** Last read time. */ - private long readTime; - - /** Last update time. */ - private long writeTime; - + /** Number of reads. */ + private int reads; + + /** Number of writes. */ + private int writes; + + /** Number of hits. */ + private int hits; + + /** Number of misses. */ + private int misses; + + /** + * Constructor. + * - * @param createTime Create time. - * @param readTime Read time. - * @param writeTime Write time. + * @param reads Reads. + * @param writes Writes. + * @param hits Hits. + * @param misses Misses. + */ - public GridCacheRestMetrics(long createTime, long readTime, long writeTime, int reads, int writes, int hits, - int misses) { - this.createTime = createTime; - this.readTime = readTime; - this.writeTime = writeTime; ++ public GridCacheRestMetrics(int reads, int writes, int hits, int misses) { + this.reads = reads; + this.writes = writes; + this.hits = hits; + this.misses = misses; + } + + /** - * Gets create time. - * - * @return Create time. - */ - public long getCreateTime() { - return createTime; - } - - /** - * Sets create time. - * - * @param createTime Create time. - */ - public void setCreateTime(long createTime) { - this.createTime = createTime; - } - - /** - * Gets read time. - * - * @return Read time. - */ - public long getReadTime() { - return readTime; - } - - /** - * Sets read time. - * - * @param readTime Read time. - */ - public void setReadTime(long readTime) { - this.readTime = readTime; - } - - /** - * Gets write time. - * - * @return Write time. - */ - public long getWriteTime() { - return writeTime; - } - - /** - * Sets write time. - * - * @param writeTime Write time. - */ - public void setWriteTime(long writeTime) { - this.writeTime = writeTime; - } - - /** + * Gets reads. + * + * @return Reads. + */ + public int getReads() { + return reads; + } + + /** + * Sets reads. + * + * @param reads Reads. + */ + public void setReads(int reads) { + this.reads = reads; + } + + /** + * Gets writes. + * + * @return Writes. + */ + public int getWrites() { + return writes; + } + + /** + * Sets writes. + * + * @param writes Writes. + */ + public void setWrites(int writes) { + this.writes = writes; + } + + /** + * Gets hits. + * + * @return Hits. + */ + public int getHits() { + return hits; + } + + /** + * Sets hits. + * + * @param hits Hits. + */ + public void setHits(int hits) { + this.hits = hits; + } + + /** + * Gets misses. + * + * @return Misses. + */ + public int getMisses() { + return misses; + } + + /** + * Sets misses. + * + * @param misses Misses. + */ + public void setMisses(int misses) { + this.misses = misses; + } + + /** + * Creates map with strings. + * + * @return Map. + */ + public Map<String, Long> map() { - Map<String, Long> map = new GridLeanMap<>(7); ++ Map<String, Long> map = new GridLeanMap<>(4); + - map.put("createTime", createTime); - map.put("readTime", readTime); - map.put("writeTime", writeTime); + map.put("reads", (long)reads); + map.put("writes", (long)writes); + map.put("hits", (long)hits); + map.put("misses", (long)misses); + + return map; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java index 0000000,f8c76c3..6f97185 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java @@@ -1,0 -1,270 +1,267 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.visor.cache; + + import org.apache.ignite.cache.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + + /** + * Data transfer object for {@link org.apache.ignite.cache.CacheMetrics}. + */ + public class VisorCacheMetrics implements Serializable { + /** */ ++ private static final int MICROSECONDS_IN_SECOND = 1_000_000; ++ ++ /** */ + private static final long serialVersionUID = 0L; + + /** Gets the number of all entries cached on this node. */ + private int size; + + /** Create time of the owning entity (either cache or entry). */ + private long createTm; + + /** Last write time of the owning entity (either cache or entry). */ + private long writeTm; + + /** Last read time of the owning entity (either cache or entry). */ + private long readTm; + + /** Last time transaction was committed. */ + private long commitTm; + + /** Last time transaction was rollback. */ + private long rollbackTm; + + /** Total number of reads of the owning entity (either cache or entry). */ + private int reads; + + /** Total number of writes of the owning entity (either cache or entry). */ + private int writes; + + /** Total number of hits for the owning entity (either cache or entry). */ + private int hits; + + /** Total number of misses for the owning entity (either cache or entry). */ + private int misses; + + /** Total number of transaction commits. */ + private int txCommits; + + /** Total number of transaction rollbacks. */ + private int txRollbacks; + + /** Reads per second. */ + private int readsPerSec; + + /** Writes per second. */ + private int writesPerSec; + + /** Hits per second. */ + private int hitsPerSec; + + /** Misses per second. */ + private int missesPerSec; + + /** Commits per second. */ + private int commitsPerSec; + + /** Rollbacks per second. */ + private int rollbacksPerSec; + + /** Gets query metrics for cache. */ + private VisorCacheQueryMetrics qryMetrics; + + /** Calculate rate of metric per second. */ + private static int perSecond(int metric, long time, long createTime) { + long seconds = (time - createTime) / 1000; + + return (seconds > 0) ? (int)(metric / seconds) : 0; + } + + /** + * @param c Cache. + * @return Data transfer object for given cache metrics. + */ + public static VisorCacheMetrics from(GridCache c) { + VisorCacheMetrics cm = new VisorCacheMetrics(); + + CacheMetrics m = c.metrics(); + + cm.size = c.size(); + - cm.createTm = m.createTime(); - cm.writeTm = m.writeTime(); - cm.readTm = m.readTime(); - cm.commitTm = m.commitTime(); - cm.rollbackTm = m.rollbackTime(); - - cm.reads = m.reads(); - cm.writes = m.writes(); - cm.hits = m.hits(); - cm.misses = m.misses(); - - cm.txCommits = m.txCommits(); - cm.txRollbacks = m.txRollbacks(); - - cm.readsPerSec = perSecond(m.reads(), m.readTime(), m.createTime()); - cm.writesPerSec = perSecond(m.writes(), m.writeTime(), m.createTime()); - cm.hitsPerSec = perSecond (m.hits(), m.readTime(), m.createTime()); - cm.missesPerSec = perSecond(m.misses(), m.readTime(), m.createTime()); - cm.commitsPerSec = perSecond(m.txCommits(), m.commitTime(), m.createTime()); - cm.rollbacksPerSec = perSecond(m.txRollbacks(), m.rollbackTime(), m.createTime()); ++ cm.reads = (int)m.getCacheGets(); ++ cm.writes = (int)(m.getCachePuts() + m.getCacheRemovals()); ++ cm.hits = (int)m.getCacheHits(); ++ cm.misses = (int)m.getCacheMisses(); ++ ++ cm.txCommits = (int)m.getCacheTxCommits(); ++ cm.txRollbacks = (int)m.getCacheTxRollbacks(); ++ ++ cm.readsPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageGetTime()); ++ cm.writesPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAveragePutTime()); ++ cm.hitsPerSec = -1; ++ cm.missesPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageRemoveTime()); ++ cm.commitsPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageTxCommitTime()); ++ cm.rollbacksPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageTxRollbackTime()); + + cm.qryMetrics = VisorCacheQueryMetrics.from(c.queries().metrics()); + + return cm; + } + + /** + * @return Create time of the owning entity (either cache or entry). + */ + public long createTime() { + return createTm; + } + + /** + * @return Last write time of the owning entity (either cache or entry). + */ + public long writeTime() { + return writeTm; + } + + /** + * @return Last read time of the owning entity (either cache or entry). + */ + public long readTime() { + return readTm; + } + + /** + * @return Last time transaction was committed. + */ + public long commitTime() { + return commitTm; + } + + /** + * @return Last time transaction was rollback. + */ + public long rollbackTime() { + return rollbackTm; + } + + /** + * @return Total number of reads of the owning entity (either cache or entry). + */ + public int reads() { + return reads; + } + + /** + * @return Total number of writes of the owning entity (either cache or entry). + */ + public int writes() { + return writes; + } + + /** + * @return Total number of hits for the owning entity (either cache or entry). + */ + public int hits() { + return hits; + } + + /** + * @return Total number of misses for the owning entity (either cache or entry). + */ + public int misses() { + return misses; + } + + /** + * @return Total number of transaction commits. + */ + public int txCommits() { + return txCommits; + } + + /** + * @return Total number of transaction rollbacks. + */ + public int txRollbacks() { + return txRollbacks; + } + + /** + * @return Reads per second. + */ + public int readsPerSecond() { + return readsPerSec; + } + + /** + * @return Writes per second. + */ + public int writesPerSecond() { + return writesPerSec; + } + + /** + * @return Hits per second. + */ + public int hitsPerSecond() { + return hitsPerSec; + } + + /** + * @return Misses per second. + */ + public int missesPerSecond() { + return missesPerSec; + } + + /** + * @return Commits per second. + */ + public int commitsPerSecond() { + return commitsPerSec; + } + + /** + * @return Rollbacks per second. + */ + public int rollbacksPerSecond() { + return rollbacksPerSec; + } + + /** + * @return Gets the number of all entries cached on this node. + */ + public int size() { + return size; + } + + /** + * @return Gets query metrics for cache. + */ + public VisorCacheQueryMetrics queryMetrics() { + return qryMetrics; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorCacheMetrics.class, this); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheResetMetricsTask.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheResetMetricsTask.java index 0000000,3b0a695..b445034 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheResetMetricsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheResetMetricsTask.java @@@ -1,0 -1,69 +1,69 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.visor.cache; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.internal.processors.task.*; + import org.apache.ignite.internal.visor.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + /** + * Reset compute grid metrics. + */ + @GridInternal + public class VisorCacheResetMetricsTask extends VisorOneNodeTask<String, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorCacheResetMetricsJob job(String arg) { + return new VisorCacheResetMetricsJob(arg, debug); + } + + /** + * Job that reset cache metrics. + */ + private static class VisorCacheResetMetricsJob extends VisorJob<String, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param arg Cache name to reset metrics for. + * @param debug Debug flag. + */ + private VisorCacheResetMetricsJob(String arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Void run(String cacheName) throws IgniteCheckedException { + GridCache cache = g.cachex(cacheName); + + if (cache != null) - cache.resetMetrics(); ++ cache.mxBean().clear(); + + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorCacheResetMetricsJob.class, this); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java index 0000000,0e7c732..466e178 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java @@@ -1,0 -1,218 +1,902 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; ++import org.apache.ignite.internal.util.GridUtils; ++import org.apache.ignite.internal.util.lang.*; ++import org.apache.ignite.lang.*; ++import org.apache.ignite.testframework.*; ++import org.apache.ignite.transactions.*; ++ ++import javax.cache.expiry.*; ++import java.util.*; ++import java.util.concurrent.*; + + /** + * Cache metrics test. + */ + public abstract class GridCacheAbstractMetricsSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int KEY_CNT = 50; + + /** {@inheritDoc} */ + @Override protected boolean swapEnabled() { + return false; + } + + /** + * @return Key count. + */ + protected int keyCount() { + return KEY_CNT; + } + + /** + * Gets number of inner reads per "put" operation. + * + * @param isPrimary {@code true} if local node is primary for current key, {@code false} otherwise. + * @return Expected number of inner reads. + */ + protected int expectedReadsPerPut(boolean isPrimary) { + return isPrimary ? 1 : 2; + } + + /** + * Gets number of missed per "put" operation. + * + * @param isPrimary {@code true} if local node is primary for current key, {@code false} otherwise. + * @return Expected number of misses. + */ + protected int expectedMissesPerPut(boolean isPrimary) { + return isPrimary ? 1 : 2; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + g.cache(null).removeAll(); + + assert g.cache(null).isEmpty(); + - g.cache(null).resetMetrics(); ++ g.cache(null).mxBean().clear(); + + g.transactions().resetMetrics(); + } + } + ++ /** {@inheritDoc} */ ++ @Override protected void beforeTest() throws Exception { ++ super.beforeTest(); ++ ++ for (int i = 0; i < gridCount(); i++) { ++ Ignite g = grid(i); ++ ++ g.cache(null).configuration().setStatisticsEnabled(true); ++ } ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testGetMetricsSnapshot() throws Exception { ++ IgniteCache<Object, Object> cache = grid(0).jcache(null); ++ ++ assertNotSame("Method metrics() should return snapshot.", cache.metrics(), cache.metrics()); ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testRemoveAsyncAvgTime() throws Exception { ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ cache.putx(1, 1); ++ cache.putx(2, 2); ++ ++ assertEquals(cache.metrics().getAverageRemoveTime(), 0.0, 0.0); ++ ++ IgniteFuture<Object> fut = cache.removeAsync(1); ++ ++ assertEquals(1, (int)fut.get()); ++ ++ assert cache.metrics().getAverageRemoveTime() > 0; ++ ++ fut = cache.removeAsync(2); ++ ++ assertEquals(2, (int)fut.get()); ++ ++ assert cache.metrics().getAverageRemoveTime() > 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testRemoveAsyncValAvgTime() throws Exception { ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ Integer key = 0; ++ ++ for (int i = 0; i < 1000; i++) { ++ if (cache.affinity().isPrimary(grid(0).localNode(), i)) { ++ key = i; ++ ++ break; ++ } ++ } ++ ++ assertEquals(cache.metrics().getAverageRemoveTime(), 0.0, 0.0); ++ ++ cache.put(key, key); ++ ++ IgniteFuture<Boolean> fut = cache.removeAsync(key, key); ++ ++ assertTrue(fut.get()); ++ ++ assert cache.metrics().getAverageRemoveTime() >= 0; ++ } ++ + /** + * @throws Exception If failed. + */ - public void testWritesReads() throws Exception { ++ public void testRemoveAvgTime() throws Exception { ++ IgniteCache<Integer, Integer> jcache = grid(0).jcache(null); ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ jcache.put(1, 1); ++ jcache.put(2, 2); ++ ++ assertEquals(cache.metrics().getAverageRemoveTime(), 0.0, 0.0); ++ ++ jcache.remove(1); ++ ++ float avgRmvTime = cache.metrics().getAverageRemoveTime(); ++ ++ assert avgRmvTime > 0; ++ ++ jcache.remove(2); ++ ++ assert cache.metrics().getAverageRemoveTime() > 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testRemoveAllAvgTime() throws Exception { ++ IgniteCache<Integer, Integer> jcache = grid(0).jcache(null); ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ jcache.put(1, 1); ++ jcache.put(2, 2); ++ jcache.put(3, 3); ++ ++ assertEquals(cache.metrics().getAverageRemoveTime(), 0.0, 0.0); ++ ++ Set<Integer> keys = new HashSet<>(4, 1); ++ keys.add(1); ++ keys.add(2); ++ keys.add(3); ++ ++ jcache.removeAll(keys); ++ ++ float averageRemoveTime = cache.metrics().getAverageRemoveTime(); ++ ++ assert averageRemoveTime >= 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testRemoveAllAsyncAvgTime() throws Exception { ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ Set<Integer> keys = new LinkedHashSet<>(); ++ ++ for (int i = 0; i < 1000; i++) { ++ if (cache.affinity().isPrimary(grid(0).localNode(), i)) { ++ keys.add(i); ++ ++ cache.put(i, i); ++ ++ if(keys.size() == 3) ++ break; ++ } ++ } ++ ++ assertEquals(cache.metrics().getAverageRemoveTime(), 0.0, 0.0); ++ ++ IgniteFuture<?> fut = cache.removeAllAsync(keys); ++ ++ fut.get(); ++ ++ assert cache.metrics().getAverageRemoveTime() >= 0; ++ } ++ ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testGetAvgTime() throws Exception { ++ IgniteCache<Integer, Integer> jcache = grid(0).jcache(null); ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ jcache.put(1, 1); ++ ++ assertEquals(0.0, cache.metrics().getAverageGetTime(), 0.0); ++ ++ jcache.get(1); ++ ++ float averageGetTime = cache.metrics().getAverageGetTime(); ++ ++ assert averageGetTime > 0; ++ ++ jcache.get(2); ++ ++ assert cache.metrics().getAverageGetTime() > 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testGetAllAvgTime() throws Exception { ++ IgniteCache<Integer, Integer> jcache = grid(0).jcache(null); ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ assertEquals(0.0, cache.metrics().getAverageGetTime(), 0.0); ++ ++ jcache.put(1, 1); ++ jcache.put(2, 2); ++ jcache.put(3, 3); ++ ++ assertEquals(0.0, cache.metrics().getAverageGetTime(), 0.0); ++ ++ Set<Integer> keys = new TreeSet<>(); ++ keys.add(1); ++ keys.add(2); ++ keys.add(3); ++ ++ jcache.getAll(keys); ++ ++ assert cache.metrics().getAverageGetTime() > 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testGetAllAsyncAvgTime() throws Exception { ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ assertEquals(0.0, cache.metrics().getAverageGetTime(), 0.0); ++ ++ cache.putx(1, 1); ++ cache.putx(2, 2); ++ cache.putx(3, 3); ++ ++ assertEquals(0.0, cache.metrics().getAverageGetTime(), 0.0); ++ ++ Set<Integer> keys = new TreeSet<>(); ++ keys.add(1); ++ keys.add(2); ++ keys.add(3); ++ ++ IgniteFuture<Map<Object, Object>> fut = cache.getAllAsync(keys); ++ ++ fut.get(); ++ ++ TimeUnit.MILLISECONDS.sleep(100L); ++ ++ assert cache.metrics().getAverageGetTime() > 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testPutAvgTime() throws Exception { ++ IgniteCache<Integer, Integer> jcache = grid(0).jcache(null); ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ assertEquals(0.0, cache.metrics().getAveragePutTime(), 0.0); ++ assertEquals(0, cache.metrics().getCachePuts()); ++ ++ jcache.put(1, 1); ++ ++ float avgPutTime = cache.metrics().getAveragePutTime(); ++ ++ assert avgPutTime >= 0; ++ ++ assertEquals(1, cache.metrics().getCachePuts()); ++ ++ jcache.put(2, 2); ++ ++ assert cache.metrics().getAveragePutTime() >= 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testPutxAsyncAvgTime() throws Exception { ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ assertEquals(0.0, cache.metrics().getAveragePutTime(), 0.0); ++ assertEquals(0, cache.metrics().getCachePuts()); ++ ++ IgniteFuture<Boolean> fut = cache.putxAsync(1, 1); ++ ++ fut.get(); ++ ++ TimeUnit.MILLISECONDS.sleep(100L); ++ ++ assert cache.metrics().getAveragePutTime() > 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testPutAsyncAvgTime() throws Exception { ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ Integer key = null; ++ ++ for (int i = 0; i < 1000; i++) { ++ if (cache.affinity().isPrimary(grid(0).localNode(), i)) { ++ key = i; ++ ++ break; ++ } ++ } ++ ++ assertEquals(0.0, cache.metrics().getAveragePutTime(), 0.0); ++ assertEquals(0.0, cache.metrics().getAverageGetTime(), 0.0); ++ ++ IgniteFuture<?> fut = cache.putAsync(key, key); ++ ++ fut.get(); ++ ++ TimeUnit.MILLISECONDS.sleep(100L); ++ ++ assert cache.metrics().getAveragePutTime() > 0; ++ assert cache.metrics().getAverageGetTime() > 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testPutxIfAbsentAsyncAvgTime() throws Exception { ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ Integer key = null; ++ ++ for (int i = 0; i < 1000; i++) { ++ if (cache.affinity().isPrimary(grid(0).localNode(), i)) { ++ key = i; ++ ++ break; ++ } ++ } ++ ++ assertEquals(0.0f, cache.metrics().getAveragePutTime()); ++ ++ IgniteFuture<Boolean> fut = cache.putxIfAbsentAsync(key, key); ++ ++ fut.get(); ++ ++ TimeUnit.MILLISECONDS.sleep(100L); ++ ++ assert cache.metrics().getAveragePutTime() > 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testPutIfAbsentAsyncAvgTime() throws Exception { ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ Integer key = null; ++ ++ for (int i = 0; i < 1000; i++) { ++ if (cache.affinity().isPrimary(grid(0).localNode(), i)) { ++ key = i; ++ ++ break; ++ } ++ } ++ ++ assertEquals(0.0f, cache.metrics().getAveragePutTime()); ++ ++ IgniteFuture<?> fut = cache.putIfAbsentAsync(key, key); ++ ++ fut.get(); ++ ++ TimeUnit.MILLISECONDS.sleep(100L); ++ ++ assert cache.metrics().getAveragePutTime() > 0; ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testPutAllAvgTime() throws Exception { ++ IgniteCache<Integer, Integer> jcache = grid(0).jcache(null); ++ GridCache<Object, Object> cache = grid(0).cache(null); ++ ++ assertEquals(0.0, cache.metrics().getAveragePutTime(), 0.0); ++ assertEquals(0, cache.metrics().getCachePuts()); ++ ++ Map<Integer, Integer> values = new HashMap<>(); ++ ++ values.put(1, 1); ++ values.put(2, 2); ++ values.put(3, 3); ++ ++ jcache.putAll(values); ++ ++ float averagePutTime = cache.metrics().getAveragePutTime(); ++ ++ assert averagePutTime >= 0; ++ assertEquals(values.size(), cache.metrics().getCachePuts()); ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testPutsReads() throws Exception { + GridCache<Integer, Integer> cache0 = grid(0).cache(null); + + int keyCnt = keyCount(); + + int expReads = 0; + int expMisses = 0; + + // Put and get a few keys. + for (int i = 0; i < keyCnt; i++) { - cache0.put(i, i); // +1 read ++ cache0.put(i, i); // +1 put + + boolean isPrimary = cache0.affinity().isPrimary(grid(0).localNode(), i); + + expReads += expectedReadsPerPut(isPrimary); + expMisses += expectedMissesPerPut(isPrimary); + - info("Writes: " + cache0.metrics().writes()); ++ info("Puts: " + cache0.metrics().getCachePuts()); + + for (int j = 0; j < gridCount(); j++) { + GridCache<Integer, Integer> cache = grid(j).cache(null); + - int cacheWrites = cache.metrics().writes(); ++ int cacheWrites = (int)cache.metrics().getCachePuts(); + + assertEquals("Wrong cache metrics [i=" + i + ", grid=" + j + ']', i + 1, cacheWrites); + } + + assertEquals("Wrong value for key: " + i, Integer.valueOf(i), cache0.get(i)); // +1 read + + expReads++; + } + + // Check metrics for the whole cache. - long writes = 0; - long reads = 0; - long hits = 0; - long misses = 0; ++ int puts = 0; ++ int reads = 0; ++ int hits = 0; ++ int misses = 0; + + for (int i = 0; i < gridCount(); i++) { + CacheMetrics m = grid(i).cache(null).metrics(); + - writes += m.writes(); - reads += m.reads(); - hits += m.hits(); - misses += m.misses(); ++ puts += m.getCachePuts(); ++ reads += m.getCacheGets(); ++ hits += m.getCacheHits(); ++ misses += m.getCacheMisses(); + } + + info("Stats [reads=" + reads + ", hits=" + hits + ", misses=" + misses + ']'); + - assertEquals(keyCnt * gridCount(), writes); ++ assertEquals(keyCnt * gridCount(), puts); + assertEquals(expReads, reads); + assertEquals(keyCnt, hits); + assertEquals(expMisses, misses); + } + + /** + * @throws Exception If failed. + */ ++ public void testMissHitPercentage() throws Exception { ++ GridCache<Integer, Integer> cache0 = grid(0).cache(null); ++ ++ int keyCnt = keyCount(); ++ ++ // Put and get a few keys. ++ for (int i = 0; i < keyCnt; i++) { ++ cache0.put(i, i); // +1 read ++ ++ info("Puts: " + cache0.metrics().getCachePuts()); ++ ++ for (int j = 0; j < gridCount(); j++) { ++ GridCache<Integer, Integer> cache = grid(j).cache(null); ++ ++ long cacheWrites = cache.metrics().getCachePuts(); ++ ++ assertEquals("Wrong cache metrics [i=" + i + ", grid=" + j + ']', i + 1, cacheWrites); ++ } ++ ++ assertEquals("Wrong value for key: " + i, Integer.valueOf(i), cache0.get(i)); // +1 read ++ } ++ ++ // Check metrics for the whole cache. ++ for (int i = 0; i < gridCount(); i++) { ++ CacheMetrics m = grid(i).cache(null).metrics(); ++ ++ assertEquals(m.getCacheHits() * 100f / m.getCacheGets(), m.getCacheHitPercentage(), 0.1f); ++ assertEquals(m.getCacheMisses() * 100f / m.getCacheGets(), m.getCacheMissPercentage(), 0.1f); ++ } ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ + public void testMisses() throws Exception { + GridCache<Integer, Integer> cache = grid(0).cache(null); + - // TODO: GG-7578. - if (cache.configuration().getCacheMode() == CacheMode.REPLICATED) - return; - + int keyCnt = keyCount(); + + int expReads = 0; + + // Get a few keys missed keys. + for (int i = 0; i < keyCnt; i++) { + assertNull("Value is not null for key: " + i, cache.get(i)); + - if (cache.affinity().isPrimary(grid(0).localNode(), i)) ++ if (cache.configuration().getCacheMode() == CacheMode.REPLICATED || ++ cache.affinity().isPrimary(grid(0).localNode(), i)) + expReads++; + else + expReads += 2; + } + + // Check metrics for the whole cache. - long writes = 0; ++ long puts = 0; + long reads = 0; + long hits = 0; + long misses = 0; + + for (int i = 0; i < gridCount(); i++) { + CacheMetrics m = grid(i).cache(null).metrics(); + - writes += m.writes(); - reads += m.reads(); - hits += m.hits(); - misses += m.misses(); ++ puts += m.getCachePuts(); ++ reads += m.getCacheGets(); ++ hits += m.getCacheHits(); ++ misses += m.getCacheMisses(); + } + - assertEquals(0, writes); ++ assertEquals(0, puts); + assertEquals(expReads, reads); + assertEquals(0, hits); + assertEquals(expReads, misses); + } + + /** + * @throws Exception If failed. + */ + public void testMissesOnEmptyCache() throws Exception { + GridCache<Integer, Integer> cache = grid(0).cache(null); + - // TODO: GG-7578. - if (cache.configuration().getCacheMode() == CacheMode.REPLICATED) - return; ++ assertEquals("Expected 0 read", 0, cache.metrics().getCacheGets()); ++ assertEquals("Expected 0 miss", 0, cache.metrics().getCacheMisses()); + + Integer key = null; + + for (int i = 0; i < 1000; i++) { + if (cache.affinity().isPrimary(grid(0).localNode(), i)) { + key = i; + + break; + } + } + + assertNotNull(key); + + cache.get(key); + - assertEquals("Expected 1 read", 1, cache.metrics().reads()); - assertEquals("Expected 1 miss", 1, cache.metrics().misses()); ++ assertEquals("Expected 1 read", 1, cache.metrics().getCacheGets()); ++ assertEquals("Expected 1 miss", 1, cache.metrics().getCacheMisses()); + + cache.put(key, key); // +1 read, +1 miss. + + cache.get(key); + - assertEquals("Expected 1 write", 1, cache.metrics().writes()); - assertEquals("Expected 3 reads", 3, cache.metrics().reads()); - assertEquals("Expected 2 misses", 2, cache.metrics().misses()); - assertEquals("Expected 1 hit", 1, cache.metrics().hits()); ++ assertEquals("Expected 1 write", 1, cache.metrics().getCachePuts()); ++ assertEquals("Expected 3 reads", 3, cache.metrics().getCacheGets()); ++ assertEquals("Expected 2 misses", 2, cache.metrics().getCacheMisses()); ++ assertEquals("Expected 1 hit", 1, cache.metrics().getCacheHits()); ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testRemoves() throws Exception { ++ GridCache<Integer, Integer> cache = grid(0).cache(null); ++ ++ cache.put(1, 1); ++ ++ // +1 remove ++ cache.remove(1); ++ ++ assertEquals(1L, cache.metrics().getCacheRemovals()); ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testManualEvictions() throws Exception { ++ GridCache<Integer, Integer> cache = grid(0).cache(null); ++ ++ if (cache.configuration().getCacheMode() == CacheMode.PARTITIONED) ++ return; ++ ++ cache.put(1, 1); ++ ++ cache.evict(1); ++ ++ assertEquals(0L, cache.metrics().getCacheRemovals()); ++ assertEquals(1L, cache.metrics().getCacheEvictions()); ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testTxEvictions() throws Exception { ++ if (grid(0).cache(null).configuration().getAtomicityMode() != CacheAtomicityMode.ATOMIC) ++ checkTtl(true); ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ ++ public void testNonTxEvictions() throws Exception { ++ if (grid(0).cache(null).configuration().getAtomicityMode() == CacheAtomicityMode.ATOMIC) ++ checkTtl(false); ++ } ++ ++ /** ++ * @param inTx ++ * @throws Exception If failed. ++ */ ++ private void checkTtl(boolean inTx) throws Exception { ++ int ttl = 1000; ++ ++ final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, ttl)); ++ ++ final GridCache<Integer, Integer> c = grid(0).cache(null); ++ ++ final Integer key = primaryKeysForCache(c, 1, 0).get(0); ++ ++ c.put(key, 1); ++ ++ CacheEntry<Integer, Integer> entry = c.entry(key); ++ ++ assert entry != null; ++ ++ assertEquals(0, entry.timeToLive()); ++ assertEquals(0, entry.expirationTime()); ++ assertEquals(0, grid(0).cache(null).metrics().getCacheEvictions()); ++ ++ long startTime = System.currentTimeMillis(); ++ ++ if (inTx) { ++ // Rollback transaction for the first time. ++ IgniteTx tx = grid(0).transactions().txStart(); ++ ++ try { ++ grid(0).jcache(null).withExpiryPolicy(expiry).put(key, 1); ++ } ++ finally { ++ tx.rollback(); ++ } ++ ++ assertEquals(0, entry.timeToLive()); ++ assertEquals(0, entry.expirationTime()); ++ } ++ ++ // Now commit transaction and check that ttl and expire time have been saved. ++ IgniteTx tx = inTx ? c.txStart() : null; ++ ++ try { ++ grid(0).jcache(null).withExpiryPolicy(expiry).put(key, 1); ++ } ++ finally { ++ if (tx != null) ++ tx.commit(); ++ } ++ ++ long[] expireTimes = new long[gridCount()]; ++ ++ for (int i = 0; i < gridCount(); i++) { ++ CacheEntry<Object, Object> curEntry = grid(i).cache(null).entry(key); ++ ++ if (curEntry.primary() || curEntry.backup()) { ++ assertEquals(ttl, curEntry.timeToLive()); ++ ++ assert curEntry.expirationTime() > startTime; ++ ++ expireTimes[i] = curEntry.expirationTime(); ++ } ++ } ++ ++ // One more update from the same cache entry to ensure that expire time is shifted forward. ++ GridUtils.sleep(100); ++ ++ tx = inTx ? c.txStart() : null; ++ ++ try { ++ grid(0).jcache(null).withExpiryPolicy(expiry).put(key, 2); ++ } ++ finally { ++ if (tx != null) ++ tx.commit(); ++ } ++ ++ for (int i = 0; i < gridCount(); i++) { ++ CacheEntry<Object, Object> curEntry = grid(i).cache(null).entry(key); ++ ++ if (curEntry.primary() || curEntry.backup()) { ++ assertEquals(ttl, curEntry.timeToLive()); ++ ++ assert curEntry.expirationTime() > expireTimes[i]; ++ ++ expireTimes[i] = curEntry.expirationTime(); ++ } ++ } ++ ++ // And one more direct update to ensure that expire time is shifted forward. ++ GridUtils.sleep(100); ++ ++ assertEquals(0, grid(0).cache(null).metrics().getCacheEvictions()); ++ ++ tx = inTx ? c.txStart() : null; ++ ++ try { ++ grid(0).jcache(null).withExpiryPolicy(expiry).put(key, 3); ++ } ++ finally { ++ if (tx != null) ++ tx.commit(); ++ } ++ ++ for (int i = 0; i < gridCount(); i++) { ++ CacheEntry<Object, Object> curEntry = grid(i).cache(null).entry(key); ++ ++ if (curEntry.primary() || curEntry.backup()) { ++ assertEquals(ttl, curEntry.timeToLive()); ++ ++ assert curEntry.expirationTime() > expireTimes[i]; ++ ++ expireTimes[i] = curEntry.expirationTime(); ++ } ++ } ++ ++ // And one more update to ensure that ttl is not changed and expire time is not shifted forward. ++ GridUtils.sleep(100); ++ ++ assertEquals(0, grid(0).cache(null).metrics().getCacheEvictions()); ++ ++ log.info("Put 4"); ++ ++ tx = inTx ? c.txStart() : null; ++ ++ try { ++ grid(0).jcache(null).put(key, 4); ++ } ++ finally { ++ if (tx != null) ++ tx.commit(); ++ } ++ ++ log.info("Put 4 done"); ++ ++ for (int i = 0; i < gridCount(); i++) { ++ CacheEntry<Object, Object> curEntry = grid(i).cache(null).entry(key); ++ ++ if (curEntry.primary() || curEntry.backup()) { ++ assertEquals(ttl, curEntry.timeToLive()); ++ assertEquals(expireTimes[i], curEntry.expirationTime()); ++ } ++ } ++ ++ assertEquals(0, grid(0).cache(null).metrics().getCacheEvictions()); ++ ++ // Avoid reloading from store. ++ map.remove(key); ++ ++ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { ++ @SuppressWarnings("unchecked") ++ @Override ++ public boolean applyx() throws IgniteCheckedException { ++ try { ++ if (c.get(key) != null) ++ return false; ++ ++ // Get "cache" field from GridCacheProxyImpl. ++ GridCacheAdapter c0 = GridTestUtils.getFieldValue(c, "cache"); ++ ++ if (!c0.context().deferredDelete()) { ++ GridCacheEntryEx e0 = c0.peekEx(key); ++ ++ return e0 == null || (e0.rawGet() == null && e0.valueBytes() == null); ++ } else ++ return true; ++ } catch (GridCacheEntryRemovedException e) { ++ throw new RuntimeException(e); ++ } ++ } ++ }, Math.min(ttl * 10, getTestTimeout()))); ++ ++ // Ensure that old TTL and expire time are not longer "visible". ++ entry = c.entry(key); ++ ++ assertEquals(0, entry.timeToLive()); ++ assertEquals(0, entry.expirationTime()); ++ ++ // Ensure that next update will not pick old expire time. ++ ++ tx = inTx ? c.txStart() : null; ++ ++ try { ++ entry.set(10); ++ } ++ finally { ++ if (tx != null) ++ tx.commit(); ++ } ++ ++ GridUtils.sleep(2000); ++ ++ entry = c.entry(key); ++ ++ assertEquals((Integer)10, entry.get()); ++ ++ assertEquals(0, entry.timeToLive()); ++ assertEquals(0, entry.expirationTime()); ++ ++ if (c.configuration().getCacheMode() != CacheMode.PARTITIONED && inTx) ++ assertEquals(1, grid(0).cache(null).metrics().getCacheEvictions()); ++ } ++ ++ /** ++ * @param cache Cache. ++ * @param cnt Keys count. ++ * @param startFrom Start value for keys search. ++ * @return Collection of keys for which given cache is primary. ++ * @throws IgniteCheckedException If failed. ++ */ ++ protected List<Integer> primaryKeysForCache(CacheProjection<Integer, Integer> cache, int cnt, int startFrom) ++ throws IgniteCheckedException { ++ List<Integer> found = new ArrayList<>(cnt); ++ ++ for (int i = startFrom; i < startFrom + 100_000; i++) { ++ if (cache.entry(i).primary()) { ++ found.add(i); ++ ++ if (found.size() == cnt) ++ return found; ++ } ++ } ++ ++ throw new IgniteCheckedException("Unable to find " + cnt + " keys as primary for cache."); + } + }