http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index e7ce96f,0c9ea00..d410521 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -259,11 -244,11 +259,11 @@@ class GridDhtPartitionTopologyImpl<K, V } } - if (cctx.preloadEnabled()) { + if (cctx.rebalanceEnabled()) { for (int p = 0; p < num; p++) { // If this is the first node in grid. - if (oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) { - assert exchId.isJoined(); + if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) || exchId.isCacheAdded(cctx.cacheId())) { + assert exchId.isJoined() || exchId.isCacheAdded(cctx.cacheId()); try { GridDhtLocalPartition locPart = localPartition(p, topVer, true, false); @@@ -676,8 -658,8 +676,8 @@@ } /** {@inheritDoc} */ - @Override public List<ClusterNode> owners(int p, long topVer) { + @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) { - if (!cctx.preloadEnabled()) + if (!cctx.rebalanceEnabled()) return ownersAndMoving(p, topVer); return nodes(p, topVer, OWNING); @@@ -690,10 -672,10 +690,10 @@@ /** {@inheritDoc} */ @Override public List<ClusterNode> moving(int p) { - if (!cctx.preloadEnabled()) + if (!cctx.rebalanceEnabled()) - return ownersAndMoving(p, -1); + return ownersAndMoving(p, AffinityTopologyVersion.NONE); - return nodes(p, -1, MOVING); + return nodes(p, AffinityTopologyVersion.NONE, MOVING); } /**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index b566720,19279d4..5edddd0 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@@ -1213,9 -1212,9 +1213,9 @@@ public final class GridDhtTxPrepareFutu } } - long topVer = tx.topologyVersion(); + AffinityTopologyVersion topVer = tx.topologyVersion(); - boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED); + boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED); for (GridCacheEntryInfo info : res.preloadEntries()) { GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 144ed7a,0f2443e..3ba08a3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@@ -207,10 -206,10 +207,10 @@@ public class GridDhtPartitionDemandPool if (exchFut != null) { if (log.isDebugEnabled()) - log.debug("Forcing preload event for future: " + exchFut); + log.debug("Forcing rebalance event for future: " + exchFut); - exchFut.listen(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { + exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.shared().exchange().forcePreloadExchange(exchFut); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 5690aab,347489c..24e421d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@@ -18,9 -18,8 +18,10 @@@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.*; + import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 0000000,dd8df35..dde98c6 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@@ -1,0 -1,1403 +1,1403 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.datastreamer; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.cluster.*; + import org.apache.ignite.internal.managers.communication.*; + import org.apache.ignite.internal.managers.deployment.*; + import org.apache.ignite.internal.managers.eventstorage.*; + import org.apache.ignite.internal.processors.affinity.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.processors.cacheobject.*; + import org.apache.ignite.internal.processors.dr.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.internal.util.future.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.jdk8.backport.*; + import org.jetbrains.annotations.*; + + import java.util.*; + import java.util.Map.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static org.apache.ignite.events.EventType.*; + import static org.apache.ignite.internal.GridTopic.*; + import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + + /** + * Data streamer implementation. + */ + @SuppressWarnings("unchecked") + public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { + /** Isolated updater. */ + private static final Updater ISOLATED_UPDATER = new IsolatedUpdater(); + + /** Cache updater. */ + private Updater<K, V> updater = ISOLATED_UPDATER; + + /** */ + private byte[] updaterBytes; + + /** Max remap count before issuing an error. */ + private static final int DFLT_MAX_REMAP_CNT = 32; + + /** Log reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + private static IgniteLogger log; + + /** Cache name ({@code null} for default cache). */ + private final String cacheName; + + + /** Per-node buffer size. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private int bufSize = DFLT_PER_NODE_BUFFER_SIZE; + + /** */ + private int parallelOps = DFLT_MAX_PARALLEL_OPS; + + /** */ + private long autoFlushFreq; + + /** Mapping. */ + @GridToStringInclude + private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>(); + + /** Discovery listener. */ + private final GridLocalEventListener discoLsnr; + + /** Context. */ + private final GridKernalContext ctx; + + /** */ + private final IgniteCacheObjectProcessor cacheObjProc; + + /** */ + private final CacheObjectContext cacheObjCtx; + + /** Communication topic for responses. */ + private final Object topic; + + /** */ + private byte[] topicBytes; + + /** {@code True} if data loader has been cancelled. */ + private volatile boolean cancelled; + + /** Active futures of this data loader. */ + @GridToStringInclude + private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>(); + + /** Closure to remove from active futures. */ + @GridToStringExclude + private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + boolean rmv = activeFuts.remove(t); + + assert rmv; + } + }; + + /** Job peer deploy aware. */ + private volatile GridPeerDeployAware jobPda; + + /** Deployment class. */ + private Class<?> depCls; + + /** Future to track loading finish. */ + private final GridFutureAdapter<?> fut; + + /** Public API future to track loading finish. */ + private final IgniteFuture<?> publicFut; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Closed flag. */ + private final AtomicBoolean closed = new AtomicBoolean(); + + /** */ + private volatile long lastFlushTime = U.currentTimeMillis(); + + /** */ + private final DelayQueue<DataStreamerImpl<K, V>> flushQ; + + /** */ + private boolean skipStore; + + /** */ + private int maxRemapCnt = DFLT_MAX_REMAP_CNT; + + /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */ + private static boolean isWarningPrinted; + + /** + * @param ctx Grid kernal context. + * @param cacheName Cache name. + * @param flushQ Flush queue. + */ + public DataStreamerImpl( + final GridKernalContext ctx, + @Nullable final String cacheName, + DelayQueue<DataStreamerImpl<K, V>> flushQ + ) { + assert ctx != null; + + this.ctx = ctx; + this.cacheObjProc = ctx.cacheObjects(); + + if (log == null) + log = U.logger(ctx, logRef, DataStreamerImpl.class); + + ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); + + if (node == null) + throw new IllegalStateException("Cache doesn't exist: " + cacheName); + - this.cacheObjCtx = ctx.cacheObjects().contextForCache(node, cacheName); ++ this.cacheObjCtx = ctx.cacheObjects().contextForCache(node, cacheName, null); + this.cacheName = cacheName; + this.flushQ = flushQ; + + discoLsnr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID id = discoEvt.eventNode().id(); + + // Remap regular mappings. + final Buffer buf = bufMappings.remove(id); + + if (buf != null) { + // Only async notification is possible since + // discovery thread may be trapped otherwise. + ctx.closure().callLocalSafe( + new Callable<Object>() { + @Override public Object call() throws Exception { + buf.onNodeLeft(); + + return null; + } + }, + true /* system pool */ + ); + } + } + }; + + ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); + + // Generate unique topic for this loader. + topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId())); + + ctx.io().addMessageListener(topic, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof DataStreamerResponse; + + DataStreamerResponse res = (DataStreamerResponse)msg; + + if (log.isDebugEnabled()) + log.debug("Received data load response: " + res); + + Buffer buf = bufMappings.get(nodeId); + + if (buf != null) + buf.onResponse(res); + + else if (log.isDebugEnabled()) + log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", "); + } + }); + + if (log.isDebugEnabled()) + log.debug("Added response listener within topic: " + topic); + + fut = new DataStreamerFuture(this); + + publicFut = new IgniteFutureImpl<>(fut); + } + + /** + * @return Cache object context. + */ + public CacheObjectContext cacheObjectContext() { + return cacheObjCtx; + } + + /** + * Enters busy lock. + */ + private void enterBusy() { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Data streamer has been closed."); + } + + /** + * Leaves busy lock. + */ + private void leaveBusy() { + busyLock.leaveBusy(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> future() { + return publicFut; + } + + /** + * @return Internal future. + */ + public IgniteInternalFuture<?> internalFuture() { + return fut; + } + + /** {@inheritDoc} */ + @Override public void deployClass(Class<?> depCls) { + this.depCls = depCls; + } + + /** {@inheritDoc} */ + @Override public void updater(Updater<K, V> updater) { + A.notNull(updater, "updater"); + + this.updater = updater; + } + + /** {@inheritDoc} */ + @Override public boolean allowOverwrite() { + return updater != ISOLATED_UPDATER; + } + + /** {@inheritDoc} */ + @Override public void allowOverwrite(boolean allow) { + if (allow == allowOverwrite()) + return; + + ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); + + if (node == null) + throw new IgniteException("Failed to get node for cache: " + cacheName); + + updater = allow ? DataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER; + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return skipStore; + } + + /** {@inheritDoc} */ + @Override public void skipStore(boolean skipStore) { + this.skipStore = skipStore; + } + + /** {@inheritDoc} */ + @Override @Nullable public String cacheName() { + return cacheName; + } + + /** {@inheritDoc} */ + @Override public int perNodeBufferSize() { + return bufSize; + } + + /** {@inheritDoc} */ + @Override public void perNodeBufferSize(int bufSize) { + A.ensure(bufSize > 0, "bufSize > 0"); + + this.bufSize = bufSize; + } + + /** {@inheritDoc} */ + @Override public int perNodeParallelOperations() { + return parallelOps; + } + + /** {@inheritDoc} */ + @Override public void perNodeParallelOperations(int parallelOps) { + this.parallelOps = parallelOps; + } + + /** {@inheritDoc} */ + @Override public long autoFlushFrequency() { + return autoFlushFreq; + } + + /** {@inheritDoc} */ + @Override public void autoFlushFrequency(long autoFlushFreq) { + A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0"); + + long old = this.autoFlushFreq; + + if (autoFlushFreq != old) { + this.autoFlushFreq = autoFlushFreq; + + if (autoFlushFreq != 0 && old == 0) + flushQ.add(this); + else if (autoFlushFreq == 0) + flushQ.remove(this); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { + A.notNull(entries, "entries"); + + return addData(entries.entrySet()); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) { + A.notEmpty(entries, "entries"); + + enterBusy(); + + try { + GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(); + + resFut.listen(rmvActiveFut); + + activeFuts.add(resFut); + + Collection<KeyCacheObject> keys = null; + + if (entries.size() > 1) { + keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1); + + for (Map.Entry<K, V> entry : entries) + keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, entry.getKey(), true)); + } + + Collection<? extends DataStreamerEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, DataStreamerEntry>() { + @Override public DataStreamerEntry apply(Entry<K, V> e) { + KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), true); + CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), true); + + return new DataStreamerEntry(key, val); + } + }); + + load0(entries0, resFut, keys, 0); + + return new IgniteFutureImpl<>(resFut); + } + catch (IgniteException e) { + return new IgniteFinishedFutureImpl<>(e); + } + finally { + leaveBusy(); + } + } + + /** + * @param key Key. + * @param val Value. + * @return Future. + */ + public IgniteFuture<?> addDataInternal(KeyCacheObject key, CacheObject val) { + return addDataInternal(Collections.singleton(new DataStreamerEntry(key, val))); + } + + /** + * @param key Key. + * @return Future. + */ + public IgniteFuture<?> removeDataInternal(KeyCacheObject key) { + return addDataInternal(Collections.singleton(new DataStreamerEntry(key, null))); + } + + /** + * @param entries Entries. + * @return Future. + */ + public IgniteFuture<?> addDataInternal(Collection<? extends DataStreamerEntry> entries) { + enterBusy(); + + GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(); + + try { + resFut.listen(rmvActiveFut); + + activeFuts.add(resFut); + + Collection<KeyCacheObject> keys = null; + + if (entries.size() > 1) { + keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1); + + for (DataStreamerEntry entry : entries) + keys.add(entry.getKey()); + } + + load0(entries, resFut, keys, 0); + + return new IgniteFutureImpl<>(resFut); + } + catch (Throwable e) { + resFut.onDone(e); + + if (e instanceof Error) + throw e; + + return new IgniteFinishedFutureImpl<>(e); + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) { + A.notNull(entry, "entry"); + + return addData(F.asList(entry)); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(K key, V val) { + A.notNull(key, "key"); + + KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, true); + CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true); + + return addDataInternal(Collections.singleton(new DataStreamerEntry(key0, val0))); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> removeData(K key) { + return addData(key, null); + } + + /** + * @param entries Entries. + * @param resFut Result future. + * @param activeKeys Active keys. + * @param remaps Remaps count. + */ + private void load0( + Collection<? extends DataStreamerEntry> entries, + final GridFutureAdapter<Object> resFut, + @Nullable final Collection<KeyCacheObject> activeKeys, + final int remaps + ) { + assert entries != null; + + if (!isWarningPrinted) { + synchronized (this) { + if (!allowOverwrite() && !isWarningPrinted) { + U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " + + "(to change, set allowOverwrite to true)"); + } + + isWarningPrinted = true; + } + } + + Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>(); + + boolean initPda = ctx.deploy().enabled() && jobPda == null; + + for (DataStreamerEntry entry : entries) { + List<ClusterNode> nodes; + + try { + KeyCacheObject key = entry.getKey(); + + assert key != null; + + if (initPda) { + jobPda = new DataStreamerPda(key.value(cacheObjCtx, false), + entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null, + updater); + + initPda = false; + } + + nodes = nodes(key); + } + catch (IgniteCheckedException e) { + resFut.onDone(e); + + return; + } + + if (F.isEmpty(nodes)) { + resFut.onDone(new ClusterTopologyException("Failed to map key to node " + + "(no nodes with cache found in topology) [infos=" + entries.size() + + ", cacheName=" + cacheName + ']')); + + return; + } + + for (ClusterNode node : nodes) { + Collection<DataStreamerEntry> col = mappings.get(node); + + if (col == null) + mappings.put(node, col = new ArrayList<>()); + + col.add(entry); + } + } + + for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) { + final UUID nodeId = e.getKey().id(); + + Buffer buf = bufMappings.get(nodeId); + + if (buf == null) { + Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey())); + + if (old != null) + buf = old; + } + + final Collection<DataStreamerEntry> entriesForNode = e.getValue(); + + IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + try { + t.get(); + + if (activeKeys != null) { + for (DataStreamerEntry e : entriesForNode) + activeKeys.remove(e.getKey()); + + if (activeKeys.isEmpty()) + resFut.onDone(); + } + else { + assert entriesForNode.size() == 1; + + // That has been a single key, + // so complete result future right away. + resFut.onDone(); + } + } + catch (IgniteCheckedException e1) { + if (log.isDebugEnabled()) + log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); + + if (cancelled) { + resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + + DataStreamerImpl.this, e1)); + } + else if (remaps + 1 > maxRemapCnt) { + resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " + + remaps), e1); + } + else + load0(entriesForNode, resFut, activeKeys, remaps + 1); + } + } + }; + + GridFutureAdapter<?> f; + + try { + f = buf.update(entriesForNode, lsnr); + } + catch (IgniteInterruptedCheckedException e1) { + resFut.onDone(e1); + + return; + } + + if (ctx.discovery().node(nodeId) == null) { + if (bufMappings.remove(nodeId, buf)) + buf.onNodeLeft(); + + if (f != null) + f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + nodeId)); + } + } + } + + /** + * @param key Key to map. + * @return Nodes to send requests to. + * @throws IgniteCheckedException If failed. + */ + private List<ClusterNode> nodes(KeyCacheObject key) throws IgniteCheckedException { + GridAffinityProcessor aff = ctx.affinity(); + + return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) : + Collections.singletonList(aff.mapKeyToNode(cacheName, key)); + } + + /** + * Performs flush. + * + * @throws IgniteCheckedException If failed. + */ + private void doFlush() throws IgniteCheckedException { + lastFlushTime = U.currentTimeMillis(); + + List<IgniteInternalFuture> activeFuts0 = null; + + int doneCnt = 0; + + for (IgniteInternalFuture<?> f : activeFuts) { + if (!f.isDone()) { + if (activeFuts0 == null) + activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2)); + + activeFuts0.add(f); + } + else { + f.get(); + + doneCnt++; + } + } + + if (activeFuts0 == null || activeFuts0.isEmpty()) + return; + + while (true) { + Queue<IgniteInternalFuture<?>> q = null; + + for (Buffer buf : bufMappings.values()) { + IgniteInternalFuture<?> flushFut = buf.flush(); + + if (flushFut != null) { + if (q == null) + q = new ArrayDeque<>(bufMappings.size() * 2); + + q.add(flushFut); + } + } + + if (q != null) { + assert !q.isEmpty(); + + boolean err = false; + + for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to flush buffer: " + e); + + err = true; + } + } + + if (err) + // Remaps needed - flush buffers. + continue; + } + + doneCnt = 0; + + for (int i = 0; i < activeFuts0.size(); i++) { + IgniteInternalFuture f = activeFuts0.get(i); + + if (f == null) + doneCnt++; + else if (f.isDone()) { + f.get(); + + doneCnt++; + + activeFuts0.set(i, null); + } + else + break; + } + + if (doneCnt == activeFuts0.size()) + return; + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public void flush() throws IgniteException { + enterBusy(); + + try { + doFlush(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + leaveBusy(); + } + } + + /** + * Flushes every internal buffer if buffer was flushed before passed in + * threshold. + * <p> + * Does not wait for result and does not fail on errors assuming that this method + * should be called periodically. + */ + @Override public void tryFlush() throws IgniteInterruptedException { + if (!busyLock.enterBusy()) + return; + + try { + for (Buffer buf : bufMappings.values()) + buf.flush(); + + lastFlushTime = U.currentTimeMillis(); + } + catch (IgniteInterruptedCheckedException e) { + throw U.convertException(e); + } + finally { + leaveBusy(); + } + } + + /** + * @param cancel {@code True} to close with cancellation. + * @throws IgniteException If failed. + */ + @Override public void close(boolean cancel) throws IgniteException { + try { + closeEx(cancel); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * @param cancel {@code True} to close with cancellation. + * @throws IgniteCheckedException If failed. + */ + public void closeEx(boolean cancel) throws IgniteCheckedException { + if (!closed.compareAndSet(false, true)) + return; + + busyLock.block(); + + if (log.isDebugEnabled()) + log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']'); + + IgniteCheckedException e = null; + + try { + // Assuming that no methods are called on this loader after this method is called. + if (cancel) { + cancelled = true; + + for (Buffer buf : bufMappings.values()) + buf.cancelAll(); + } + else + doFlush(); + + ctx.event().removeLocalEventListener(discoLsnr); + + ctx.io().removeMessageListener(topic); + } + catch (IgniteCheckedException e0) { + e = e0; + } + + fut.onDone(null, e); + + if (e != null) + throw e; + } + + /** + * @return {@code true} If the loader is closed. + */ + boolean isClosed() { + return fut.isDone(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + close(false); + } + + /** + * @return Max remap count. + */ + public int maxRemapCount() { + return maxRemapCnt; + } + + /** + * @param maxRemapCnt New max remap count. + */ + public void maxRemapCount(int maxRemapCnt) { + this.maxRemapCnt = maxRemapCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStreamerImpl.class, this); + } + + /** {@inheritDoc} */ + @Override public long getDelay(TimeUnit unit) { + return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + /** + * @return Next flush time. + */ + private long nextFlushTime() { + return lastFlushTime + autoFlushFreq; + } + + /** {@inheritDoc} */ + @Override public int compareTo(Delayed o) { + return nextFlushTime() > ((DataStreamerImpl)o).nextFlushTime() ? 1 : -1; + } + + /** + * + */ + private class Buffer { + /** Node. */ + private final ClusterNode node; + + /** Active futures. */ + private final Collection<IgniteInternalFuture<Object>> locFuts; + + /** Buffered entries. */ + private List<DataStreamerEntry> entries; + + /** */ + @GridToStringExclude + private GridFutureAdapter<Object> curFut; + + /** Local node flag. */ + private final boolean isLocNode; + + /** ID generator. */ + private final AtomicLong idGen = new AtomicLong(); + + /** Active futures. */ + private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs; + + /** */ + private final Semaphore sem; + + /** Closure to signal on task finish. */ + @GridToStringExclude + private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> t) { + signalTaskFinished(t); + } + }; + + /** + * @param node Node. + */ + Buffer(ClusterNode node) { + assert node != null; + + this.node = node; + + locFuts = new GridConcurrentHashSet<>(); + reqs = new ConcurrentHashMap8<>(); + + // Cache local node flag. + isLocNode = node.equals(ctx.discovery().localNode()); + + entries = newEntries(); + curFut = new GridFutureAdapter<>(); + curFut.listen(signalC); + + sem = new Semaphore(parallelOps); + } + + /** + * @param newEntries Infos. + * @param lsnr Listener for the operation future. + * @throws IgniteInterruptedCheckedException If failed. + * @return Future for operation. + */ + @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries, + IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException { + List<DataStreamerEntry> entries0 = null; + GridFutureAdapter<Object> curFut0; + + synchronized (this) { + curFut0 = curFut; + + curFut0.listen(lsnr); + + for (DataStreamerEntry entry : newEntries) + entries.add(entry); + + if (entries.size() >= bufSize) { + entries0 = entries; + + entries = newEntries(); + curFut = new GridFutureAdapter<>(); + curFut.listen(signalC); + } + } + + if (entries0 != null) { + submit(entries0, curFut0); + + if (cancelled) + curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this)); + } + + return curFut0; + } + + /** + * @return Fresh collection with some space for outgrowth. + */ + private List<DataStreamerEntry> newEntries() { + return new ArrayList<>((int)(bufSize * 1.2)); + } + + /** + * @return Future if any submitted. + * + * @throws IgniteInterruptedCheckedException If thread has been interrupted. + */ + @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException { + List<DataStreamerEntry> entries0 = null; + GridFutureAdapter<Object> curFut0 = null; + + synchronized (this) { + if (!entries.isEmpty()) { + entries0 = entries; + curFut0 = curFut; + + entries = newEntries(); + curFut = new GridFutureAdapter<>(); + curFut.listen(signalC); + } + } + + if (entries0 != null) + submit(entries0, curFut0); + + // Create compound future for this flush. + GridCompoundFuture<Object, Object> res = null; + + for (IgniteInternalFuture<Object> f : locFuts) { + if (res == null) + res = new GridCompoundFuture<>(); + + res.add(f); + } + + for (IgniteInternalFuture<Object> f : reqs.values()) { + if (res == null) + res = new GridCompoundFuture<>(); + + res.add(f); + } + + if (res != null) + res.markInitialized(); + + return res; + } + + /** + * Increments active tasks count. + * + * @throws IgniteInterruptedCheckedException If thread has been interrupted. + */ + private void incrementActiveTasks() throws IgniteInterruptedCheckedException { + U.acquire(sem); + } + + /** + * @param f Future that finished. + */ + private void signalTaskFinished(IgniteInternalFuture<Object> f) { + assert f != null; + + sem.release(); + } + + /** + * @param entries Entries to submit. + * @param curFut Current future. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private void submit(final Collection<DataStreamerEntry> entries, final GridFutureAdapter<Object> curFut) + throws IgniteInterruptedCheckedException { + assert entries != null; + assert !entries.isEmpty(); + assert curFut != null; + + incrementActiveTasks(); + + IgniteInternalFuture<Object> fut; + + if (isLocNode) { + fut = ctx.closure().callLocalSafe( + new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, updater), false); + + locFuts.add(fut); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> t) { + try { + boolean rmv = locFuts.remove(t); + + assert rmv; + + curFut.onDone(t.get()); + } + catch (IgniteCheckedException e) { + curFut.onDone(e); + } + } + }); + } + else { + try { + for (DataStreamerEntry e : entries) { + e.getKey().prepareMarshal(cacheObjCtx); + + CacheObject val = e.getValue(); + + if (val != null) + val.prepareMarshal(cacheObjCtx); + } + + if (updaterBytes == null) { + assert updater != null; + + updaterBytes = ctx.config().getMarshaller().marshal(updater); + } + + if (topicBytes == null) + topicBytes = ctx.config().getMarshaller().marshal(topic); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal (request will not be sent).", e); + + return; + } + + GridDeployment dep = null; + GridPeerDeployAware jobPda0 = null; + + if (ctx.deploy().enabled()) { + try { + jobPda0 = jobPda; + + assert jobPda0 != null; + + dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader()); + + GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); + + if (cache != null) + cache.context().deploy().onEnter(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e); + + return; + } + + if (dep == null) + U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass()); + } + + long reqId = idGen.incrementAndGet(); + + fut = curFut; + + reqs.put(reqId, (GridFutureAdapter<Object>)fut); + + DataStreamerRequest req = new DataStreamerRequest( + reqId, + topicBytes, + cacheName, + updaterBytes, + entries, + true, + skipStore, + dep != null ? dep.deployMode() : null, + dep != null ? jobPda0.deployClass().getName() : null, + dep != null ? dep.userVersion() : null, + dep != null ? dep.participants() : null, + dep != null ? dep.classLoaderId() : null, + dep == null); + + try { + ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); + + if (log.isDebugEnabled()) + log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); + } + catch (IgniteCheckedException e) { + if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) + ((GridFutureAdapter<Object>)fut).onDone(e); + else + ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " + + "request (node has left): " + node.id())); + } + } + } + + /** + * + */ + void onNodeLeft() { + assert !isLocNode; + assert bufMappings.get(node.id()) != this; + + if (log.isDebugEnabled()) + log.debug("Forcibly completing futures (node has left): " + node.id()); + + Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + node.id()); + + for (GridFutureAdapter<Object> f : reqs.values()) + f.onDone(e); + + // Make sure to complete current future. + GridFutureAdapter<Object> curFut0; + + synchronized (this) { + curFut0 = curFut; + } + + curFut0.onDone(e); + } + + /** + * @param res Response. + */ + void onResponse(DataStreamerResponse res) { + if (log.isDebugEnabled()) + log.debug("Received data load response: " + res); + + GridFutureAdapter<?> f = reqs.remove(res.requestId()); + + if (f == null) { + if (log.isDebugEnabled()) + log.debug("Future for request has not been found: " + res.requestId()); + + return; + } + + Throwable err = null; + + byte[] errBytes = res.errorBytes(); + + if (errBytes != null) { + try { + GridPeerDeployAware jobPda0 = jobPda; + + err = ctx.config().getMarshaller().unmarshal( + errBytes, + jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader()); + } + catch (IgniteCheckedException e) { + f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); + + return; + } + } + + f.onDone(null, err); + + if (log.isDebugEnabled()) + log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']'); + } + + /** + * + */ + void cancelAll() { + IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this); + + for (IgniteInternalFuture<?> f : locFuts) { + try { + f.cancel(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to cancel mini-future.", e); + } + } + + for (GridFutureAdapter<?> f : reqs.values()) + f.onDone(err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + int size; + + synchronized (this) { + size = entries.size(); + } + + return S.toString(Buffer.class, this, + "entriesCnt", size, + "locFutsSize", locFuts.size(), + "reqsSize", reqs.size()); + } + } + + /** + * Data streamer peer-deploy aware. + */ + private class DataStreamerPda implements GridPeerDeployAware { + /** */ + private static final long serialVersionUID = 0L; + + /** Deploy class. */ + private Class<?> cls; + + /** Class loader. */ + private ClassLoader ldr; + + /** Collection of objects to detect deploy class and class loader. */ + private Collection<Object> objs; + + /** + * Constructs data streamer peer-deploy aware. + * + * @param objs Collection of objects to detect deploy class and class loader. + */ + private DataStreamerPda(Object... objs) { + this.objs = Arrays.asList(objs); + } + + /** {@inheritDoc} */ + @Override public Class<?> deployClass() { + if (cls == null) { + Class<?> cls0 = null; + + if (depCls != null) + cls0 = depCls; + else { + for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) { + Object o = it.next(); + + if (o != null) + cls0 = U.detectClass(o); + } + + if (cls0 == null || U.isJdk(cls0)) + cls0 = DataStreamerImpl.class; + } + + assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']'; + + cls = cls0; + } + + return cls; + } + + /** {@inheritDoc} */ + @Override public ClassLoader classLoader() { + if (ldr == null) { + ClassLoader ldr0 = deployClass().getClassLoader(); + + // Safety. + if (ldr0 == null) + ldr0 = U.gridClassLoader(); + + assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']'; + + ldr = ldr0; + } + + return ldr; + } + } + + /** + * Isolated updater which only loads entry initial value. + */ + private static class IsolatedUpdater implements Updater<KeyCacheObject, CacheObject>, + DataStreamerCacheUpdaters.InternalUpdater { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<KeyCacheObject, CacheObject> cache, + Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) { + IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache; + + GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache(); + + if (internalCache.isNear()) + internalCache = internalCache.context().near().dht(); + + GridCacheContext cctx = internalCache.context(); + - long topVer = cctx.affinity().affinityTopologyVersion(); ++ AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + + GridCacheVersion ver = cctx.versions().next(topVer); + + for (Map.Entry<KeyCacheObject, CacheObject> e : entries) { + try { + e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); + + GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer); + + entry.unswap(true, false); + + entry.initialValue(e.getValue(), + ver, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, + false, + topVer, + GridDrType.DR_LOAD); + + cctx.evicts().touch(entry, topVer); + } + catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { + // No-op. + } + catch (IgniteCheckedException ex) { + IgniteLogger log = cache.unwrap(Ignite.class).log(); + + U.error(log, "Failed to set initial value for cache entry: " + e, ex); + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java index fd59174,db550c3..a0618a3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java @@@ -25,6 -26,7 +25,7 @@@ import org.apache.ignite.configuration. import org.apache.ignite.igfs.*; import org.apache.ignite.igfs.mapreduce.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; ++import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.ipc.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@@ -261,27 -262,16 +262,27 @@@ public class IgfsProcessor extends Igfs throw new IgniteCheckedException("Duplicate IGFS name found (check configuration and " + "assign unique name to each): " + name); - GridCacheAdapter<Object, Object> dataCache = ctx.cache().internalCache(cfg.getDataCacheName()); + CacheConfiguration dataCacheCfg = config(cfg.getDataCacheName()); + CacheConfiguration metaCacheCfg = config(cfg.getMetaCacheName()); - if (dataCache == null) + if (dataCacheCfg == null) throw new IgniteCheckedException("Data cache is not configured locally for IGFS: " + cfg); - if (dataCacheCfg.isQueryIndexEnabled()) - GridCache<Object, Object> metaCache = ctx.cache().cache(cfg.getMetaCacheName()); ++ if (GridQueryProcessor.isEnabled(dataCacheCfg)) + throw new IgniteCheckedException("IGFS data cache cannot start with enabled query indexing."); - if (metaCache == null) + if (dataCacheCfg.getAtomicityMode() != TRANSACTIONAL) + throw new IgniteCheckedException("Data cache should be transactional: " + cfg.getDataCacheName()); + + if (metaCacheCfg == null) throw new IgniteCheckedException("Metadata cache is not configured locally for IGFS: " + cfg); - if (metaCacheCfg.isQueryIndexEnabled()) ++ if (GridQueryProcessor.isEnabled(metaCacheCfg)) + throw new IgniteCheckedException("IGFS metadata cache cannot start with enabled query indexing."); + + if (metaCacheCfg.getAtomicityMode() != TRANSACTIONAL) + throw new IgniteCheckedException("Meta cache should be transactional: " + cfg.getMetaCacheName()); + if (F.eq(cfg.getDataCacheName(), cfg.getMetaCacheName())) throw new IgniteCheckedException("Cannot use same cache as both data and meta cache: " + cfg.getName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 2208341,fb8f4b8..bdd9de2 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@@ -18,8 -18,9 +18,10 @@@ package org.apache.ignite.internal.processors.query; import org.apache.ignite.*; + import org.apache.ignite.cache.query.*; + import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.lang.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java index ebeef17,6bda0ad..a138a2a --- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java @@@ -112,9 -111,8 +111,8 @@@ public class IgfsFragmentizerAbstractSe cfg.setCacheMode(PARTITIONED); cfg.setBackups(0); cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(IGFS_GROUP_SIZE)); - cfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + cfg.setNearConfiguration(null); cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cfg.setQueryIndexEnabled(false); cfg.setAtomicityMode(TRANSACTIONAL); return cfg; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java index 5b9b11b,f9ff6b4..1ad7cb9 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java @@@ -34,8 -33,9 +34,8 @@@ import org.apache.ignite.testframework. import java.util.*; import java.util.concurrent.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; /** @@@ -82,9 -82,8 +82,8 @@@ public class GridDiscoveryManagerAliveC cCfg.setCacheMode(PARTITIONED); cCfg.setBackups(1); - cCfg.setDistributionMode(NEAR_PARTITIONED); + cCfg.setNearConfiguration(new NearCacheConfiguration()); - cCfg.setPreloadMode(SYNC); - cCfg.setQueryIndexEnabled(false); + cCfg.setRebalanceMode(SYNC); cCfg.setWriteSynchronizationMode(FULL_SYNC); TcpDiscoverySpi disc = new TcpDiscoverySpi(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 364ac38,0c80ed6..186bd46 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@@ -4150,31 -4094,197 +4149,226 @@@ public abstract class GridCacheAbstract } /** + * @throws Exception If failed. + */ + public void testLocalClearKey() throws Exception { + addKeys(); + + String keyToRmv = "key" + 25; + + Ignite g = primaryIgnite(keyToRmv); + + g.<String, Integer>jcache(null).localClear(keyToRmv); + + checkLocalRemovedKey(keyToRmv); + + g.<String, Integer>jcache(null).put(keyToRmv, 1); + + String keyToEvict = "key" + 30; + + g = primaryIgnite(keyToEvict); + + g.<String, Integer>jcache(null).localEvict(Collections.singleton(keyToEvict)); + + g.<String, Integer>jcache(null).localClear(keyToEvict); + + checkLocalRemovedKey(keyToEvict); + } + + /** + * @param keyToRmv Removed key. + */ + private void checkLocalRemovedKey(String keyToRmv) { + for (int i = 0; i < 500; ++i) { + String key = "key" + i; + + boolean found = primaryIgnite(key).jcache(null).localPeek(key) != null; + + if (keyToRmv.equals(key)) { + Collection<ClusterNode> nodes = grid(0).affinity(null).mapKeyToPrimaryAndBackups(key); + + for (int j = 0; j < gridCount(); ++j) { + if (nodes.contains(grid(j).localNode()) && grid(j) != primaryIgnite(key)) + assertTrue("Not found on backup removed key ", grid(j).jcache(null).localPeek(key) != null); + } + + assertFalse("Found removed key " + key, found); + } + else + assertTrue("Not found key " + key, found); + } + } + + /** + * @throws Exception If failed. + */ + public void testLocalClearKeys() throws Exception { + Map<String, List<String>> keys = addKeys(); + + Ignite g = grid(0); + + Set<String> keysToRmv = new HashSet<>(); + + for (int i = 0; i < gridCount(); ++i) { + List<String> gridKeys = keys.get(grid(i).name()); + + if (gridKeys.size() > 2) { + keysToRmv.add(gridKeys.get(0)); + + keysToRmv.add(gridKeys.get(1)); + + g = grid(i); + + break; + } + } + + assert keysToRmv.size() > 1; + + g.<String, Integer>jcache(null).localClearAll(keysToRmv); + + for (int i = 0; i < 500; ++i) { + String key = "key" + i; + + boolean found = primaryIgnite(key).jcache(null).localPeek(key) != null; + + if (keysToRmv.contains(key)) + assertFalse("Found removed key " + key, found); + else + assertTrue("Not found key " + key, found); + } + } + + /** + * Add 500 keys to cache only on primaries nodes. + * + * @return Map grid's name to its primary keys. + */ + private Map<String, List<String>> addKeys() { + // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries + // because some of them were blocked due to having readers. + Map<String, List<String>> keys = new HashMap<>(); + + for (int i = 0; i < gridCount(); ++i) + keys.put(grid(i).name(), new ArrayList<String>()); + + for (int i = 0; i < 500; ++i) { + String key = "key" + i; + + Ignite g = primaryIgnite(key); + + g.jcache(null).put(key, "value" + i); + + keys.get(g.name()).add(key); + } + + return keys; + } + + /** + * @throws Exception If failed. + */ + public void testGlobalClearKey() throws Exception { + testGlobalClearKey(false, Arrays.asList("key25")); + } + + /** + * @throws Exception If failed. + */ + public void testGlobalClearKeyAsync() throws Exception { + testGlobalClearKey(true, Arrays.asList("key25")); + } + + /** + * @throws Exception If failed. + */ + public void testGlobalClearKeys() throws Exception { + testGlobalClearKey(false, Arrays.asList("key25", "key100", "key150")); + } + + /** + * @throws Exception If failed. + */ + public void testGlobalClearKeysAsync() throws Exception { + testGlobalClearKey(true, Arrays.asList("key25", "key100", "key150")); + } + + /** + * @param async If {@code true} uses async method. + * @param keysToRmv Keys to remove. + * @throws Exception If failed. + */ + protected void testGlobalClearKey(boolean async, Collection<String> keysToRmv) throws Exception { + // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries + // because some of them were blocked due to having readers. + for (int i = 0; i < 500; ++i) { + String key = "key" + i; + + Ignite g = primaryIgnite(key); + + g.jcache(null).put(key, "value" + i); + } + + if (async) { + IgniteCache<String, Integer> asyncCache = jcache().withAsync(); + + if (keysToRmv.size() == 1) + asyncCache.clear(F.first(keysToRmv)); + else + asyncCache.clearAll(new HashSet<>(keysToRmv)); + + asyncCache.future().get(); + } + else { + if (keysToRmv.size() == 1) + jcache().clear(F.first(keysToRmv)); + else + jcache().clearAll(new HashSet<>(keysToRmv)); + } + + for (int i = 0; i < 500; ++i) { + String key = "key" + i; + + boolean found = false; + + for (int j = 0; j < gridCount(); j++) { + if (jcache(j).localPeek(key) != null) + found = true; + } + + if (!keysToRmv.contains(key)) + assertTrue("Not found key " + key, found); + else + assertFalse("Found removed key " + key, found); + } + } ++ ++ /** + * + */ + protected CacheStartMode cacheStartType() { + String mode = System.getProperty("cache.start.mode"); + + if (CacheStartMode.NODES_THEN_CACHES.name().equalsIgnoreCase(mode)) + return CacheStartMode.NODES_THEN_CACHES; + + if (CacheStartMode.ONE_BY_ONE.name().equalsIgnoreCase(mode)) + return CacheStartMode.ONE_BY_ONE; + + return CacheStartMode.STATIC; + } + + /** + * + */ + public enum CacheStartMode { + /** Start caches together nodes (not dynamically) */ + STATIC, + + /** */ + NODES_THEN_CACHES, + + /** */ + ONE_BY_ONE + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java index b4adbf4,3d7ed58..54c9316 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java @@@ -119,9 -119,9 +119,9 @@@ public abstract class GridCacheAbstract cacheCfg.setName(cacheName); cacheCfg.setCacheMode(getCacheMode()); cacheCfg.setAtomicityMode(getAtomicMode()); - cacheCfg.setDistributionMode(getDistributionMode()); + cacheCfg.setNearConfiguration(nearConfiguration()); cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - cacheCfg.setPreloadMode(SYNC); + cacheCfg.setRebalanceMode(SYNC); if (gridName.endsWith("1")) cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<CacheStore>(LOCAL_STORE_1)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java index e4e2fc8,dacbf63..d3d83e2 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@@ -252,7 -251,8 +252,8 @@@ public abstract class GridCacheAbstract cfg.setCacheMode(cacheMode()); cfg.setAtomicityMode(atomicityMode()); cfg.setWriteSynchronizationMode(writeSynchronization()); - cfg.setDistributionMode(distributionMode()); + cfg.setNearConfiguration(nearConfiguration()); + cfg.setIndexedTypes(indexedTypes()); if (cacheMode() == PARTITIONED) cfg.setBackups(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java index f7f05d6,2d551a1..b28b268 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java @@@ -64,9 -66,9 +66,9 @@@ public abstract class GridCacheAbstract cacheCfg.setName(null); cacheCfg.setCacheMode(getCacheMode()); cacheCfg.setAtomicityMode(getAtomicMode()); - cacheCfg.setDistributionMode(getDistributionMode()); + cacheCfg.setNearConfiguration(nearConfiguration()); cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setPreloadMode(CachePreloadMode.SYNC); + cacheCfg.setRebalanceMode(SYNC); cacheCfg.setAffinityMapper(AFFINITY_MAPPER); cfg.setCacheConfiguration(cacheCfg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java index 9b32b69,69bcef2..fa35de3 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java @@@ -88,7 -88,8 +88,7 @@@ public abstract class GridCacheBasicSto cc.setWriteSynchronizationMode(FULL_SYNC); cc.setSwapEnabled(false); cc.setAtomicityMode(atomicityMode()); - cc.setPreloadMode(SYNC); - cc.setDistributionMode(distributionMode()); + cc.setRebalanceMode(SYNC); cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); cc.setReadThrough(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java index 1a42930,d3de9ff..f0212bb --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java @@@ -45,8 -46,9 +46,8 @@@ import java.util.* import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; @@@ -110,11 -112,13 +111,11 @@@ public class GridCacheConcurrentTxMulti CacheConfiguration cc = defaultCacheConfiguration(); cc.setCacheMode(mode); - cc.setDistributionMode(PARTITIONED_ONLY); cc.setEvictionPolicy(new CacheLruEvictionPolicy(1000)); cc.setEvictSynchronized(false); - cc.setEvictNearSynchronized(false); cc.setSwapEnabled(false); cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setPreloadMode(NONE); + cc.setRebalanceMode(NONE); c.setCacheConfiguration(cc); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java index 16ebacd,d412e77..24b69ec --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java @@@ -46,8 -46,9 +46,8 @@@ import java.util.* import java.util.concurrent.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.configuration.DeploymentMode.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentOffHeapSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java index 302d070,83a23b7..bf56659 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java @@@ -31,8 -31,9 +31,8 @@@ import org.apache.ignite.testframework. import java.util.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.configuration.DeploymentMode.*; @@@ -100,9 -101,9 +100,9 @@@ public class GridCacheDeploymentSelfTes cfg.setCacheMode(PARTITIONED); cfg.setWriteSynchronizationMode(FULL_SYNC); - cfg.setPreloadMode(SYNC); + cfg.setRebalanceMode(SYNC); cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setDistributionMode(NEAR_PARTITIONED); + cfg.setNearConfiguration(new NearCacheConfiguration()); cfg.setBackups(1); return cfg; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java index 3098a26,d890da9..858de64 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java @@@ -33,7 -33,8 +33,7 @@@ import java.util.concurrent.* import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; /** @@@ -85,7 -86,8 +85,7 @@@ public abstract class GridCacheGetAndTr cc.setWriteSynchronizationMode(FULL_SYNC); cc.setSwapEnabled(false); cc.setAtomicityMode(atomicityMode()); - cc.setPreloadMode(SYNC); - cc.setDistributionMode(distributionMode()); + cc.setRebalanceMode(SYNC); cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); cc.setReadThrough(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java index 10afc73,6950d5c..0680454 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java @@@ -34,8 -34,9 +34,8 @@@ import java.util.concurrent.atomic.* import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java index f274f36,d21d20c..a928ea3 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java @@@ -71,9 -72,8 +71,8 @@@ public class GridCacheKeyCheckSelfTest cfg.setCacheMode(PARTITIONED); cfg.setBackups(1); - cfg.setDistributionMode(distributionMode()); + cfg.setNearConfiguration(nearConfiguration()); cfg.setWriteSynchronizationMode(FULL_SYNC); - cfg.setQueryIndexEnabled(false); cfg.setAtomicityMode(atomicityMode); return cfg; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java index 0f8984c,2cf509e..a03798e --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java @@@ -75,9 -76,8 +75,8 @@@ public class GridCacheLeakTest extends cfg.setCacheMode(PARTITIONED); cfg.setBackups(1); - cfg.setDistributionMode(PARTITIONED_ONLY); + cfg.setNearConfiguration(null); cfg.setWriteSynchronizationMode(FULL_SYNC); - cfg.setQueryIndexEnabled(false); cfg.setAtomicityMode(atomicityMode); return cfg; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java index ce6153b,0fcbaef..d360df5 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java @@@ -33,8 -32,9 +33,8 @@@ import org.apache.ignite.transactions.* import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; @@@ -74,12 -74,10 +74,12 @@@ public class GridCacheMultiUpdateLockSe cfg.setCacheMode(PARTITIONED); cfg.setBackups(1); - cfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + + if (!nearEnabled) + cfg.setNearConfiguration(null); cfg.setWriteSynchronizationMode(FULL_SYNC); - cfg.setPreloadMode(SYNC); + cfg.setRebalanceMode(SYNC); return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java index 690d4e5,bce17b7..52076cd --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java @@@ -72,8 -73,7 +72,7 @@@ public class GridCacheOffHeapTest exten cacheCfg.setWriteSynchronizationMode(FULL_ASYNC); cacheCfg.setSwapEnabled(false); cacheCfg.setCacheMode(mode); - cacheCfg.setQueryIndexEnabled(false); - cacheCfg.setDistributionMode(PARTITIONED_ONLY); + cacheCfg.setNearConfiguration(null); cacheCfg.setStartSize(startSize); if (onheap > 0) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java index 8c0743a,7b429b2..355893c --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java @@@ -35,8 -35,9 +35,8 @@@ import org.apache.ignite.testframework. import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.configuration.DeploymentMode.*; /** @@@ -95,11 -95,12 +94,10 @@@ public class GridCacheP2PUndeploySelfTe partCacheCfg.setName("partitioned"); partCacheCfg.setCacheMode(PARTITIONED); - partCacheCfg.setPreloadMode(mode); + partCacheCfg.setRebalanceMode(mode); partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(11, 1)); partCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - partCacheCfg.setQueryIndexEnabled(false); - partCacheCfg.setEvictNearSynchronized(false); partCacheCfg.setAtomicityMode(TRANSACTIONAL); - partCacheCfg.setDistributionMode(NEAR_PARTITIONED); if (offheap) partCacheCfg.setOffHeapMaxMemory(OFFHEAP);