http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 0000000,e6ff9b5..9a086cd mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@@ -1,0 -1,712 +1,705 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.transactions; + + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.managers.communication.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.processors.timeout.*; + import org.apache.ignite.internal.transactions.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.transactions.*; + import org.jetbrains.annotations.*; + + import javax.cache.*; + import java.util.*; + + /** + * Transaction managed by cache ({@code 'Ex'} stands for external). + */ + public interface IgniteInternalTx<K, V> extends AutoCloseable, GridTimeoutObject { + /** + * + */ + @SuppressWarnings("PublicInnerClass") + public enum FinalizationStatus { + /** Transaction was not finalized yet. */ + NONE, + + /** Transaction is being finalized by user. */ + USER_FINISH, + + /** Recovery request is received, user finish requests should be ignored. */ + RECOVERY_WAIT, + + /** Transaction is being finalized by recovery procedure. */ + RECOVERY_FINISH + } + + /** + * Gets unique identifier for this transaction. + * + * @return Transaction UID. + */ + public IgniteUuid xid(); + + /** + * ID of the node on which this transaction started. + * + * @return Originating node ID. + */ + public UUID nodeId(); + + /** + * ID of the thread in which this transaction started. + * + * @return Thread ID. + */ + public long threadId(); + + /** + * Start time of this transaction. + * + * @return Start time of this transaction on this node. + */ + public long startTime(); + + /** + * Cache transaction isolation level. + * + * @return Isolation level. + */ + public IgniteTxIsolation isolation(); + + /** + * Cache transaction concurrency mode. + * + * @return Concurrency mode. + */ + public IgniteTxConcurrency concurrency(); + + /** + * Flag indicating whether transaction was started automatically by the + * system or not. System will start transactions implicitly whenever + * any cache {@code put(..)} or {@code remove(..)} operation is invoked + * outside of transaction. + * + * @return {@code True} if transaction was started implicitly. + */ + public boolean implicit(); + + /** + * Get invalidation flag for this transaction. If set to {@code true}, then + * remote values will be {@code invalidated} (set to {@code null}) instead + * of updated. + * <p> + * Invalidation messages don't carry new values, so they are a lot lighter + * than update messages. However, when a value is accessed on a node after + * it's been invalidated, it must be loaded from persistent store. + * + * @return Invalidation flag. + */ + public boolean isInvalidate(); + + /** + * Gets current transaction state value. + * + * @return Current transaction state. + */ + public IgniteTxState state(); + + /** + * Gets timeout value in milliseconds for this transaction. If transaction times + * out prior to it's completion, {@link org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException} will be thrown. + * + * @return Transaction timeout value. + */ + public long timeout(); + + /** + * Sets transaction timeout value. This value can be set only before a first operation + * on transaction has been performed. + * + * @param timeout Transaction timeout value. + * @return Previous timeout. + */ + public long timeout(long timeout); + + /** + * Modify the transaction associated with the current thread such that the + * only possible outcome of the transaction is to roll back the + * transaction. + * + * @return {@code True} if rollback-only flag was set as a result of this operation, + * {@code false} if it was already set prior to this call or could not be set + * because transaction is already finishing up committing or rolling back. + */ + public boolean setRollbackOnly(); + + /** + * If transaction was marked as rollback-only. + * + * @return {@code True} if transaction can only be rolled back. + */ + public boolean isRollbackOnly(); + + /** + * Commits this transaction by initiating {@code two-phase-commit} process. + * + * @throws IgniteCheckedException If commit failed. + */ + @IgniteAsyncSupported + public void commit() throws IgniteCheckedException; + + /** + * Ends the transaction. Transaction will be rolled back if it has not been committed. + * + * @throws IgniteCheckedException If transaction could not be gracefully ended. + */ + @Override public void close() throws IgniteCheckedException; + + /** + * Rolls back this transaction. + * + * @throws IgniteCheckedException If rollback failed. + */ + @IgniteAsyncSupported + public void rollback() throws IgniteCheckedException; + + /** + * Removes metadata by name. + * + * @param name Name of the metadata to remove. + * @param <T> Type of the value. + * @return Value of removed metadata or {@code null}. + */ + @Nullable public <T> T removeMeta(String name); + + /** + * Gets metadata by name. + * + * @param name Metadata name. + * @param <T> Type of the value. + * @return Metadata value or {@code null}. + */ + @Nullable public <T> T meta(String name); + + /** + * Adds a new metadata. + * + * @param name Metadata name. + * @param val Metadata value. + * @param <T> Type of the value. + * @return Metadata previously associated with given name, or + * {@code null} if there was none. + */ + @Nullable public <T> T addMeta(String name, T val); + + /** + * @return Size of the transaction. + */ + public int size(); + + /** + * @return {@code True} if transaction is allowed to use store. + */ + public boolean storeEnabled(); + + /** + * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with + * store enabled. + */ + public boolean storeUsed(); + + /** + * Checks if this is system cache transaction. System transactions are isolated from user transactions + * because some of the public API methods may be invoked inside user transactions and internally start + * system cache transactions. + * + * @return {@code True} if transaction is started for system cache. + */ + public boolean system(); + + /** + * @return Pool where message for the given transaction must be processed. + */ + public GridIoPolicy ioPolicy(); + + /** + * @return Last recorded topology version. + */ + public long topologyVersion(); + + /** + * @return Flag indicating whether transaction is implicit with only one key. + */ + public boolean implicitSingle(); + + /** + * @return Collection of cache IDs involved in this transaction. + */ + public Collection<Integer> activeCacheIds(); + + /** + * Attempts to set topology version and returns the current value. + * If topology version was previously set, then it's value will + * be returned (but not updated). + * + * @param topVer Topology version. + * @return Recorded topology version. + */ + public long topologyVersion(long topVer); + + /** + * @return {@code True} if transaction is empty. + */ + public boolean empty(); + + /** + * @return {@code True} if transaction group-locked. + */ + public boolean groupLock(); + + /** + * @return Group lock key if {@link #groupLock()} is {@code true}. + */ + @Nullable public IgniteTxKey groupLockKey(); + + /** + * @return {@code True} if preparing flag was set with this call. + */ + public boolean markPreparing(); + + /** + * @param status Finalization status to set. + * @return {@code True} if could mark was set. + */ + public boolean markFinalizing(FinalizationStatus status); + + /** + * @param cacheCtx Cache context. + * @param part Invalid partition. + */ + public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part); + + /** + * @return Invalid partitions. + */ + public Set<Integer> invalidPartitions(); + + /** + * Gets owned version for near remote transaction. + * + * @param key Key to get version for. + * @return Owned version, if any. + */ + @Nullable public GridCacheVersion ownedVersion(IgniteTxKey<K> key); + + /** + * Gets ID of additional node involved. For example, in DHT case, other node is + * near node ID. + * + * @return Parent node IDs. + */ + @Nullable public UUID otherNodeId(); + + /** + * @return Event node ID. + */ + public UUID eventNodeId(); + + /** + * Gets node ID which directly started this transaction. In case of DHT local transaction it will be + * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote + * transaction it will be starter node ID. + * + * @return Originating node ID. + */ + public UUID originatingNodeId(); + + /** + * @return Master node IDs. + */ + public Collection<UUID> masterNodeIds(); + + /** + * @return Near transaction ID. + */ + @Nullable public GridCacheVersion nearXidVersion(); + + /** + * @return Transaction nodes mapping (primary node -> related backup nodes). + */ + @Nullable public Map<UUID, Collection<UUID>> transactionNodes(); + + /** + * @param entry Entry to check. + * @return {@code True} if lock is owned. + * @throws GridCacheEntryRemovedException If entry has been removed. + */ + public boolean ownsLock(GridCacheEntryEx<K, V> entry) throws GridCacheEntryRemovedException; + + /** + * @param entry Entry to check. + * @return {@code True} if lock is owned. + */ + public boolean ownsLockUnsafe(GridCacheEntryEx<K, V> entry); + + /** + * For Partitioned caches, this flag is {@code false} for remote DHT and remote NEAR + * transactions because serializability of transaction is enforced on primary node. All + * other transaction types must enforce it. + * + * @return Enforce serializable flag. + */ + public boolean enforceSerializable(); + + /** + * @return {@code True} if near transaction. + */ + public boolean near(); + + /** + * @return {@code True} if DHT transaction. + */ + public boolean dht(); + + /** + * @return {@code True} if dht colocated transaction. + */ + public boolean colocated(); + + /** + * @return {@code True} if transaction is local, {@code false} if it's remote. + */ + public boolean local(); + + /** + * @return {@code True} if transaction is replicated. + */ + public boolean replicated(); + + /** + * @return Subject ID initiated this transaction. + */ + public UUID subjectId(); + + /** + * Task name hash in case if transaction was initiated within task execution. + * + * @return Task name hash. + */ + public int taskNameHash(); + + /** + * @return {@code True} if transaction is user transaction, which means: + * <ul> + * <li>Explicit</li> + * <li>Local</li> + * <li>Not DHT</li> + * </ul> + */ + public boolean user(); + + /** + * @return {@code True} if transaction is configured with synchronous commit flag. + */ + public boolean syncCommit(); + + /** + * @return {@code True} if transaction is configured with synchronous rollback flag. + */ + public boolean syncRollback(); + + /** + * @param key Key to check. + * @return {@code True} if key is present. + */ + public boolean hasWriteKey(IgniteTxKey<K> key); + + /** + * @return Read set. + */ + public Set<IgniteTxKey<K>> readSet(); + + /** + * @return Write set. + */ + public Set<IgniteTxKey<K>> writeSet(); + + /** + * @return All transaction entries. + */ + public Collection<IgniteTxEntry<K, V>> allEntries(); + + /** + * @return Write entries. + */ + public Collection<IgniteTxEntry<K, V>> writeEntries(); + + /** + * @return Read entries. + */ + public Collection<IgniteTxEntry<K, V>> readEntries(); + + /** + * @return Transaction write map. + */ + public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap(); + + /** + * @return Transaction read map. + */ + public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap(); + + /** - * Gets pessimistic recovery writes, i.e. values that have never been sent to remote nodes with lock requests. - * - * @return Collection of recovery writes. - */ - public Collection<IgniteTxEntry<K, V>> recoveryWrites(); - - /** + * Gets a list of entries that needs to be locked on the next step of prepare stage of + * optimistic transaction. + * + * @return List of tx entries for optimistic locking. + */ + public Collection<IgniteTxEntry<K, V>> optimisticLockEntries(); + + /** + * Seals transaction for updates. + */ + public void seal(); + + /** + * @param key Key for the entry. + * @return Entry for the key (either from write set or read set). + */ + @Nullable public IgniteTxEntry<K, V> entry(IgniteTxKey<K> key); + + /** + * @param ctx Cache context. + * @param failFast Fail-fast flag. + * @param key Key to look up. + * @param filter Filter to check. + * @return Current value for the key within transaction. + * @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}. + */ + @Nullable public GridTuple<V> peek( + GridCacheContext<K, V> ctx, + boolean failFast, + K key, + @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws GridCacheFilterFailedException; + + /** + * @return Start version. + */ + public GridCacheVersion startVersion(); + + /** + * @return Transaction version. + */ + public GridCacheVersion xidVersion(); + + /** + * @return Version created at commit time. + */ + public GridCacheVersion commitVersion(); + + /** + * @param commitVer Commit version. + * @return {@code True} if version was set. + */ + public boolean commitVersion(GridCacheVersion commitVer); + + /** + * @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) + * assigned to this transaction at the end of write phase. + */ + public GridCacheVersion endVersion(); + + /** + * Prepare state. + * + * @throws IgniteCheckedException If failed. + */ + public void prepare() throws IgniteCheckedException; + + /** + * Prepare stage. + * + * @return Future for prepare step. + */ + public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync(); + + /** + * @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) + * assigned to this transaction at the end of write phase. + */ + public void endVersion(GridCacheVersion endVer); + + /** + * @return Transaction write version. For all transactions except DHT transactions, will be equal to + * {@link #xidVersion()}. + */ + public GridCacheVersion writeVersion(); + + /** + * Sets write version. + * + * @param ver Write version. + */ + public void writeVersion(GridCacheVersion ver); + + /** + * @return Future for transaction completion. + */ + public IgniteInternalFuture<IgniteInternalTx> finishFuture(); + + /** + * @param state Transaction state. + * @return {@code True} if transition was valid, {@code false} otherwise. + */ + public boolean state(IgniteTxState state); + + /** + * @param invalidate Invalidate flag. + */ + public void invalidate(boolean invalidate); + + /** + * @param sysInvalidate System invalidate flag. + */ + public void systemInvalidate(boolean sysInvalidate); + + /** + * @return System invalidate flag. + */ + public boolean isSystemInvalidate(); + + /** + * Asynchronously rollback this transaction. + * + * @return Rollback future. + */ + public IgniteInternalFuture<IgniteInternalTx> rollbackAsync(); + + /** + * Asynchronously commits this transaction by initiating {@code two-phase-commit} process. + * + * @return Future for commit operation. + */ + public IgniteInternalFuture<IgniteInternalTx> commitAsync(); + + /** + * Callback invoked whenever there is a lock that has been acquired + * by this transaction for any of the participating entries. + * + * @param entry Cache entry. + * @param owner Lock candidate that won ownership of the lock. + * @return {@code True} if transaction cared about notification. + */ + public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner); + + /** + * @return {@code True} if transaction timed out. + */ + public boolean timedOut(); + + /** + * @return {@code True} if transaction had completed successfully or unsuccessfully. + */ + public boolean done(); + + /** + * @return {@code True} for OPTIMISTIC transactions. + */ + public boolean optimistic(); + + /** + * @return {@code True} for PESSIMISTIC transactions. + */ + public boolean pessimistic(); + + /** + * @return {@code True} if read-committed. + */ + public boolean readCommitted(); + + /** + * @return {@code True} if repeatable-read. + */ + public boolean repeatableRead(); + + /** + * @return {@code True} if serializable. + */ + public boolean serializable(); + + /** + * Checks whether given key has been removed within transaction. + * + * @param key Key to check. + * @return {@code True} if key has been removed. + */ + public boolean removed(IgniteTxKey<K> key); + + /** + * Gets allowed remaining time for this transaction. + * + * @return Remaining time. + * @throws IgniteTxTimeoutCheckedException If transaction timed out. + */ + public long remainingTime() throws IgniteTxTimeoutCheckedException; + + /** + * @return Alternate transaction versions. + */ + public Collection<GridCacheVersion> alternateVersions(); + + /** + * @return {@code True} if transaction needs completed versions for processing. + */ + public boolean needsCompletedVersions(); + + /** + * @param base Base for committed versions. + * @param committed Committed transactions relative to base. + * @param rolledback Rolled back transactions relative to base. + */ + public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed, + Collection<GridCacheVersion> rolledback); + + /** + * @return {@code True} if transaction has at least one internal entry. + */ + public boolean internal(); + + /** + * @return {@code True} if transaction is a one-phase-commit transaction. + */ + public boolean onePhaseCommit(); + + /** + * @return {@code True} if transaction has transform entries. This flag will be only set for local + * transactions. + */ + public boolean hasTransforms(); + + /** + * @return Public API proxy. + */ + public IgniteTxProxy proxy(); + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 911516b,7eb966d..9478769 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@@ -20,13 -20,14 +20,17 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; + import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; + import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.transactions.*; + import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; @@@ -52,9 -53,9 +56,9 @@@ public class IgniteTxHandler<K, V> /** Shared cache context. */ private GridCacheSharedContext<K, V> ctx; - public IgniteInternalFuture<IgniteTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId, + public IgniteInternalFuture<IgniteInternalTx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId, final GridNearTxPrepareRequest<K, V> req) { - return prepareTx(nearNodeId, null, req); + return prepareTx(nearNodeId, null, req, null); } /** @@@ -134,12 -149,8 +138,12 @@@ * @param req Near prepare request. * @return Future for transaction. */ - public IgniteInternalFuture<IgniteTxEx<K, V>> prepareTx( - public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx, - final GridNearTxPrepareRequest<K, V> req) { ++ public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareTx( + UUID nearNodeId, + @Nullable GridNearTxLocal<K, V> locTx, + GridNearTxPrepareRequest<K, V> req, + @Nullable IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { assert nearNodeId != null; assert req != null; @@@ -166,11 -175,8 +170,11 @@@ * @param req Near prepare request. * @return Prepare future. */ - private IgniteInternalFuture<IgniteTxEx<K, V>> prepareColocatedTx( - private IgniteInternalFuture<IgniteInternalTx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx, - final GridNearTxPrepareRequest<K, V> req) { ++ private IgniteInternalFuture<IgniteInternalTx<K, V>> prepareColocatedTx( + final GridNearTxLocal<K, V> locTx, + final GridNearTxPrepareRequest<K, V> req, + final IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { IgniteInternalFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys. @@@ -182,13 -188,8 +186,13 @@@ if (ex != null) throw new GridClosureException(ex); - IgniteInternalFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal( - IgniteInternalFuture<IgniteInternalTx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(), - req.transactionNodes(), req.last(), req.lastBackups()); ++ IgniteInternalFuture<IgniteInternalTx<K, V>> fut = locTx.prepareAsyncLocal( + req.reads(), + req.writes(), + req.transactionNodes(), + req.last(), + req.lastBackups(), + completeCb); if (locTx.isRollbackOnly()) locTx.rollbackAsync(); @@@ -220,11 -221,8 +224,11 @@@ * @param req Near prepare request. * @return Prepare future. */ - private IgniteInternalFuture<IgniteTxEx<K, V>> prepareNearTx( - private IgniteInternalFuture<IgniteInternalTx<K, V>> prepareNearTx(final UUID nearNodeId, - final GridNearTxPrepareRequest<K, V> req) { ++ private IgniteInternalFuture<IgniteInternalTx<K, V>> prepareNearTx( + final UUID nearNodeId, + final GridNearTxPrepareRequest<K, V> req, + IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { ClusterNode nearNode = ctx.node(nearNodeId); if (nearNode == null) { @@@ -291,28 -285,9 +295,28 @@@ } if (tx != null) { - IgniteInternalFuture<IgniteInternalTx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(), - req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(), - req.lastBackups()); + tx.transactionNodes(req.transactionNodes()); + + if (req.onePhaseCommit()) { + assert req.last(); + assert F.isEmpty(req.lastBackups()) || req.lastBackups().size() <= 1; + + tx.onePhaseCommit(true); + } + + if (req.returnValue()) + tx.needReturnValue(true); + - IgniteInternalFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync( ++ IgniteInternalFuture<IgniteInternalTx<K, V>> fut = tx.prepareAsync( + req.reads(), + req.writes(), + req.dhtVersions(), + req.messageId(), + req.miniId(), + req.transactionNodes(), + req.last(), + req.lastBackups(), + completeCb); if (tx.isRollbackOnly()) { try { @@@ -351,10 -326,11 +355,10 @@@ */ private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse<K, V> res) { GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, V>)ctx.mvcc() - .<IgniteTxEx<K, V>>future(res.version(), res.futureId()); + .<IgniteInternalTx<K, V>>future(res.version(), res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']'); + U.warn(log, "Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']'); return; } @@@ -452,13 -428,17 +456,13 @@@ if (locTx != null && locTx.colocatedLocallyMapped()) colocatedFinishFut = finishColocatedLocal(req.commit(), locTx); - IgniteInternalFuture<IgniteTx> nearFinishFut = null; + IgniteInternalFuture<IgniteInternalTx> nearFinishFut = null; - if (locTx == null || locTx.nearLocallyMapped()) { - if (locTx != null) - req.cloneEntries(); - + if (locTx == null || locTx.nearLocallyMapped()) nearFinishFut = finishDhtLocal(nodeId, locTx, req); - } if (colocatedFinishFut != null && nearFinishFut != null) { - GridCompoundFuture<IgniteTx, IgniteTx> res = new GridCompoundFuture<>(ctx.kernalContext()); + GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> res = new GridCompoundFuture<>(ctx.kernalContext()); res.add(colocatedFinishFut); res.add(nearFinishFut); @@@ -569,8 -549,22 +573,8 @@@ tx.nearFinishFutureId(req.futureId()); tx.nearFinishMiniId(req.miniId()); - tx.recoveryWrites(req.recoveryWrites()); - - Collection<IgniteTxEntry<K, V>> writeEntries = req.writes(); - - if (!F.isEmpty(writeEntries)) { - // In OPTIMISTIC mode, we get the values at PREPARE stage. - assert tx.concurrency() == PESSIMISTIC; - - for (IgniteTxEntry<K, V> entry : writeEntries) - tx.addEntry(req.messageId(), entry); - } - - if (tx.pessimistic()) - tx.prepare(); - IgniteInternalFuture<IgniteTx> commitFut = tx.commitAsync(); + IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync(); // Only for error logging. commitFut.listenAsync(CU.errorLogger(log)); @@@ -672,29 -664,13 +676,29 @@@ if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions())) res.invalidPartitions(dhtTx.invalidPartitions()); + + if (req.onePhaseCommit()) { + assert req.last(); + + if (dhtTx != null) { + dhtTx.onePhaseCommit(true); + + finish(nodeId, dhtTx, req); + } + + if (nearTx != null) { + nearTx.onePhaseCommit(true); + + finish(nodeId, nearTx, req); + } + } } catch (IgniteCheckedException e) { - if (e instanceof IgniteTxRollbackException) + if (e instanceof IgniteTxRollbackCheckedException) - U.error(log, "Transaction was rolled back before prepare completed: " + dhtTx, e); + U.error(log, "Transaction was rolled back before prepare completed: " + req, e); - else if (e instanceof IgniteTxOptimisticException) { + else if (e instanceof IgniteTxOptimisticCheckedException) { if (log.isDebugEnabled()) - log.debug("Optimistic failure for remote transaction (will rollback): " + dhtTx); + log.debug("Optimistic failure for remote transaction (will rollback): " + req); } else U.error(log, "Failed to process prepare request: " + req, e); @@@ -746,20 -779,12 +750,20 @@@ if (nearTx != null && nearTx.local()) nearTx = null; - finish(nodeId, dhtTx, req, req.writes(), req.ttls()); + finish(nodeId, dhtTx, req); if (nearTx != null) - finish(nodeId, nearTx, req, req.nearWrites(), req.nearTtls()); + finish(nodeId, nearTx, req); - sendReply(nodeId, req); + if (dhtTx != null && !dhtTx.done()) { - dhtTx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteTx> igniteTxIgniteFuture) { ++ dhtTx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { ++ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) { + sendReply(nodeId, req); + } + }); + } + else + sendReply(nodeId, req); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 400daff,9f1e5d1..ad3b890 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@@ -18,19 -18,14 +18,15 @@@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; - import org.apache.ignite.cache.*; import org.apache.ignite.internal.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.internal.processors.cache.version.*; - import org.apache.ignite.internal.util.*; - import org.apache.ignite.lang.*; - import org.apache.ignite.plugin.security.*; - import org.apache.ignite.portables.*; - import org.apache.ignite.transactions.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; + import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.dr.*; + import org.apache.ignite.internal.transactions.*; + import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; @@@ -1907,8 -1876,8 +1926,8 @@@ public abstract class IgniteTxLocalAdap * @throws IgniteCheckedException If failed. */ private boolean filter(GridCacheEntryEx<K, V> cached, - IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException { + IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { - return pessimistic() || cached.context().isAll(cached, filter); + return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter); } /** @@@ -2482,8 -2487,7 +2501,8 @@@ @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap, final boolean retval, @Nullable GridCacheEntryEx<K, V> cached, - @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter - @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter) { ++ @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter + ) { assert filter == null || invokeMap == null; cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT); @@@ -2737,12 -2717,9 +2756,12 @@@ @Nullable Map<? extends K, GridCacheVersion> drMap, @Nullable GridCacheEntryEx<K, V> cached, final boolean retval, - @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter) { + @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter) { cacheCtx.checkSecurity(GridSecurityPermission.CACHE_REMOVE); + if (retval) + needReturnValue(true); + final Collection<? extends K> keys0; if (drMap != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 3459434,cb4c7c8..87708ed --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@@ -334,8 -355,8 +335,8 @@@ public class IgniteTxManager<K, V> exte * @return {@code True} if transaction has been committed or rolled back, * {@code false} otherwise. */ - public boolean isCompleted(IgniteTxEx<K, V> tx) { + public boolean isCompleted(IgniteInternalTx<K, V> tx) { - return committedVers.contains(tx.xidVersion()) || rolledbackVers.contains(tx.xidVersion()); + return completedVers.containsKey(tx.xidVersion()); } /** @@@ -735,10 -756,10 +736,10 @@@ boolean txSerializableEnabled = cctx.txConfig().isTxSerializableEnabled(); // Clean up committed transactions queue. - if (tx.pessimistic()) { + if (tx.pessimistic() && tx.local()) { if (tx.enforceSerializable() && txSerializableEnabled) { - for (Iterator<IgniteTxEx<K, V>> it = committedQ.iterator(); it.hasNext();) { - IgniteTxEx<K, V> committedTx = it.next(); + for (Iterator<IgniteInternalTx<K, V>> it = committedQ.iterator(); it.hasNext();) { + IgniteInternalTx<K, V> committedTx = it.next(); assert committedTx != tx; @@@ -969,8 -985,8 +970,8 @@@ /** * @param tx Tx to remove. */ - public void removeCommittedTx(IgniteTxEx<K, V> tx) { + public void removeCommittedTx(IgniteInternalTx<K, V> tx) { - committedVers.remove(tx.xidVersion()); + completedVers.remove(tx.xidVersion(), true); } /** @@@ -1167,10 -1181,10 +1168,10 @@@ throw new IgniteException("Missing commit version (consider increasing " + IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" + - committedVers.firstx() + ", lastVer=" + committedVers.lastx() + ", tx=" + tx.xid() + ']'); + completedVers.firstKey() + ", lastVer=" + completedVers.lastKey() + ", tx=" + tx.xid() + ']'); } - ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap = transactionMap(tx); + ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> txIdMap = transactionMap(tx); if (txIdMap.remove(tx.xidVersion(), tx)) { // 2. Must process completed entries before unlocking! @@@ -1183,9 -1197,11 +1184,8 @@@ } // 3.1 Call dataStructures manager. - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) - cacheCtx.dataStructures().onTxCommitted(tx); + cctx.kernalContext().dataStructures().onTxCommitted(tx); - // 3.2 Add to pessimistic commit buffer if needed. - addPessimisticRecovery(tx); - // 4. Unlock write resources. if (tx.groupLock()) unlockGroupLocks(tx); @@@ -1497,9 -1513,9 +1497,9 @@@ * @return {@code True} if all keys were locked. * @throws IgniteCheckedException If lock has been cancelled. */ - private boolean lockMultiple(IgniteTxEx<K, V> tx, Iterable<IgniteTxEntry<K, V>> entries) + private boolean lockMultiple(IgniteInternalTx<K, V> tx, Iterable<IgniteTxEntry<K, V>> entries) throws IgniteCheckedException { - assert tx.optimistic(); + assert tx.optimistic() || !tx.local(); long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 3056a44,b60499c..a518074 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@@ -459,19 -358,15 +358,21 @@@ public abstract class GridCacheAbstract * @throws Exception In case of error. */ public void testGetAll() throws Exception { - IgniteTx tx = txEnabled() ? cache().txStart() : null; + IgniteTx tx = txEnabled() ? transactions().txStart() : null; + + final IgniteCache<String, Integer> cache = jcache(); - cache.put("key1", 1); - cache.put("key2", 2); + try { - cache().put("key1", 1); - cache().put("key2", 2); ++ cache.put("key1", 1); ++ cache.put("key2", 2); - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { @@@ -505,37 -400,31 +406,36 @@@ // Now do the same checks but within transaction. if (txEnabled()) { - tx = cache().txStart(); + tx = transactions().txStart(); - try { - assert cache().getAll(Collections.<String>emptyList()).isEmpty(); + assert cache.getAll(Collections.<String>emptySet()).isEmpty(); - map1 = cache().getAll(F.asList("key1", "key2", "key9999")); + map1 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999")); - info("Retrieved map1: " + map1); + info("Retrieved map1: " + map1); - assert 2 == map1.size() : "Invalid map: " + map1; + assert 2 == map1.size() : "Invalid map: " + map1; - assertEquals(1, (int)map1.get("key1")); - assertEquals(2, (int)map1.get("key2")); - assertNull(map1.get("key9999")); + assertEquals(1, (int)map1.get("key1")); + assertEquals(2, (int)map1.get("key2")); + assertNull(map1.get("key9999")); - map2 = cache().getAll(F.asList("key1", "key2", "key9999")); + map2 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999")); - info("Retrieved map2: " + map2); + info("Retrieved map2: " + map2); - assert 2 == map2.size() : "Invalid map: " + map2; + assert 2 == map2.size() : "Invalid map: " + map2; - assertEquals(1, (int)map2.get("key1")); - assertEquals(2, (int)map2.get("key2")); - assertNull(map2.get("key9999")); + assertEquals(1, (int)map2.get("key1")); + assertEquals(2, (int)map2.get("key2")); + assertNull(map2.get("key9999")); - tx.commit(); + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } } } @@@ -764,27 -531,29 +542,26 @@@ */ public void testPutTx() throws Exception { if (txEnabled()) { - try (IgniteTx tx = cache().txStart()) { - assert cache().put("key1", 1) == null; - assert cache().put("key2", 2) == null; - IgniteTx tx = transactions().txStart(); - + IgniteCache<String, Integer> cache = jcache(); ++ ++ try (IgniteTx tx = transactions().txStart()) { ++ assert cache.getAndPut("key1", 1) == null; ++ assert cache.getAndPut("key2", 2) == null; - assert cache.getAndPut("key1", 1) == null; - assert cache.getAndPut("key2", 2) == null; - - // Check inside transaction. - assert cache.get("key1") == 1; - assert cache.get("key2") == 2; + // Check inside transaction. - assert cache().get("key1") == 1; - assert cache().get("key2") == 2; ++ assert cache.get("key1") == 1; ++ assert cache.get("key2") == 2; - // Put again to check returned values. - assert cache.getAndPut("key1", 1) == 1; - assert cache.getAndPut("key2", 2) == 2; + // Put again to check returned values. - assert cache().put("key1", 1) == 1; - assert cache().put("key2", 2) == 2; - - checkContainsKey(true, "key1"); - checkContainsKey(true, "key2"); - - assert cache().get("key1") != null; - assert cache().get("key2") != null; - assert cache().get("wrong") == null; ++ assert cache.getAndPut("key1", 1) == 1; ++ assert cache.getAndPut("key2", 2) == 2; - checkContainsKey(true, "key1"); - checkContainsKey(true, "key2"); - - assert cache.get("key1") != null; - assert cache.get("key2") != null; - assert cache.get("wrong") == null; - - tx.commit(); ++ assert cache.get("key1") != null; ++ assert cache.get("key2") != null; ++ assert cache.get("wrong") == null; ++ + tx.commit(); + } // Check outside transaction. checkContainsKey(true, "key1"); @@@ -1429,25 -1140,20 +1158,26 @@@ * @param inTx Whether to start transaction. * @throws Exception If failed. */ - private void checkPutx(boolean inTx) throws Exception { - IgniteTx tx = inTx ? cache().txStart() : null; + private void checkPut(boolean inTx) throws Exception { + IgniteTx tx = inTx ? transactions().txStart() : null; - try { - assert cache().putx("key1", 1); - assert cache().putx("key2", 2); - assert !cache().putx("wrong", 3, gte100); + IgniteCache<String, Integer> cache = jcache(); - cache.put("key1", 1); - cache.put("key2", 2); - - // Check inside transaction. - assert cache.get("key1") == 1; - assert cache.get("key2") == 2; ++ try { ++ cache.put("key1", 1); ++ cache.put("key2", 2); ++ + // Check inside transaction. - assert cache().get("key1") == 1; - assert cache().get("key2") == 2; ++ assert cache.get("key1") == 1; ++ assert cache.get("key2") == 2; - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } checkSize(F.asSet("key1", "key2")); @@@ -1456,37 -1162,48 +1186,54 @@@ checkContainsKey(true, "key2"); checkContainsKey(false, "wrong"); - checkContainsValue(true, 1); - checkContainsValue(true, 2); + assert cache.get("key1") == 1; + assert cache.get("key2") == 2; + assert cache.get("wrong") == null; + } - assert cache().get("key1") == 1; - assert cache().get("key2") == 2; - assert cache().get("wrong") == null; + /** + * @throws Exception If failed. + */ + public void testPutAsync() throws Exception { + IgniteTx tx = txEnabled() ? transactions().txStart() : null; - assert cache().putx("key1", 100, F.<String, Integer>cacheContainsPeek(1)); - assert cache().putx("key1", 101, gte100); - assert !cache().putx("key1", 102, gte200); + IgniteCache<String, Integer> cacheAsync = jcache().withAsync(); - checkContainsValue(false, 1); - checkContainsValue(true, 101); - checkContainsValue(true, 2); - jcache().put("key2", 1); - - cacheAsync.put("key1", 10); - - IgniteFuture<?> fut1 = cacheAsync.future(); - - cacheAsync.put("key2", 11); - - IgniteFuture<?> fut2 = cacheAsync.future(); - - IgniteFuture<IgniteTx> f = null; - - if (tx != null) { - tx = (IgniteTx)tx.withAsync(); ++ try { ++ jcache().put("key2", 1); ++ ++ cacheAsync.put("key1", 10); ++ ++ IgniteFuture<?> fut1 = cacheAsync.future(); ++ ++ cacheAsync.put("key2", 11); ++ ++ IgniteFuture<?> fut2 = cacheAsync.future(); ++ ++ IgniteFuture<IgniteTx> f = null; ++ ++ if (tx != null) { ++ tx = (IgniteTx)tx.withAsync(); ++ ++ tx.commit(); - checkSize(F.asSet("key1", "key2")); - tx.commit(); ++ f = tx.future(); ++ } ++ ++ fut1.get(); ++ fut2.get(); - checkContainsKey(true, "key1"); - checkContainsKey(true, "key2"); - checkContainsKey(false, "wrong"); - f = tx.future(); ++ assert f == null || f.get().state() == COMMITTED; ++ } ++ finally { ++ if (tx != null) ++ tx.close(); + } - - fut1.get(); - fut2.get(); - - assert f == null || f.get().state() == COMMITTED; - assert cache().get("key1") == 101; - assert cache().get("key2") == 2; - assert cache().get("wrong") == null; - } + checkSize(F.asSet("key1", "key2")); - /** - * @throws Exception If failed. - */ - public void testFiltersOptimistic1() throws Exception { - checkFilters1(OPTIMISTIC, REPEATABLE_READ); + assert jcache().get("key1") == 10; + assert jcache().get("key2") == 11; } /** @@@ -5392,11 -3774,9 +3804,11 @@@ assertFalse(cache.iterator().hasNext()); - final int SIZE = 5000; ++ final int SIZE = 20000; + Map<String, Integer> entries = new HashMap<>(); - for (int i = 0; i < 20000; ++i) { + for (int i = 0; i < SIZE; ++i) { cache.put(Integer.toString(i), i); entries.put(Integer.toString(i), i); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java index 2c82cd6,ae7d871..e0569ed --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java @@@ -19,12 -19,11 +19,13 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cache.*; + import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.resources.*; - import org.apache.ignite.transactions.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@@ -163,22 -162,12 +164,21 @@@ public abstract class GridCacheAbstract fut.get(); // Wait for completion. for (int i = 0; i < GRID_CNT; i++) { + info("Running iteration: " + i); + + for (int g = 0; g < GRID_CNT; g++) { + info("Will check grid: " + g); + + GridCacheEntryEx<Object, Object> testEntry = ((IgniteKernal)grid(i)).internalCache(null).peekEx("TestKey"); + + info("Entry: " + testEntry); + } + - CacheProjection<String, int[]> c = grid(i).cache(null).projection(String.class, int[].class); + IgniteCache<String, int[]> c = grid(i).jcache(null); // Do within transaction to make sure that lock is acquired // which means that all previous transactions have committed. -- - try (IgniteTx tx = c.txStart(concur, isolation)) { + try (IgniteTx tx = grid(i).transactions().txStart(concur, isolation)) { int[] arr = c.get("TestKey"); assertNotNull(arr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearMetricsSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearMetricsSelfTest.java index 2f71741,97b95c9..0086311 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearMetricsSelfTest.java @@@ -176,26 -179,27 +179,27 @@@ public class GridCacheNearMetricsSelfTe for (int j = 0; j < gridCount(); j++) { Ignite g = grid(j); + IgniteCache<Object, Object> jcache = g.jcache(null); - if (g.cache(null).affinity().isPrimaryOrBackup(g.cluster().localNode(), key)) - assertEquals(1, g.cache(null).metrics().getCachePuts()); + if (affinity(jcache).isPrimaryOrBackup(g.cluster().localNode(), key)) + assertEquals(1, jcache.metrics().getCachePuts()); else - assertEquals(0, g.cache(null).metrics().getCachePuts()); + assertEquals(0, jcache.metrics().getCachePuts()); - if (g.cache(null).affinity().isPrimary(g.cluster().localNode(), key)) { - assertEquals(1, g.cache(null).metrics().getCacheGets()); - assertEquals(0, g.cache(null).metrics().getCacheHits()); - assertEquals(1, g.cache(null).metrics().getCacheMisses()); + if (affinity(jcache).isPrimary(g.cluster().localNode(), key)) { + assertEquals(1, jcache.metrics().getCacheGets()); + assertEquals(0, jcache.metrics().getCacheHits()); + assertEquals(1, jcache.metrics().getCacheMisses()); } - else if (g.cache(null).affinity().isBackup(g.cluster().localNode(), key)){ - assertEquals(1, g.cache(null).metrics().getCacheGets()); - assertEquals(1, g.cache(null).metrics().getCacheHits()); - assertEquals(0, g.cache(null).metrics().getCacheMisses()); + else if (affinity(jcache).isBackup(g.cluster().localNode(), key)){ - assertEquals(2, jcache.metrics().getCacheGets()); ++ assertEquals(1, jcache.metrics().getCacheGets()); + assertEquals(1, jcache.metrics().getCacheHits()); - assertEquals(1, jcache.metrics().getCacheMisses()); ++ assertEquals(0, jcache.metrics().getCacheMisses()); } else { - assertEquals(0, g.cache(null).metrics().getCacheGets()); - assertEquals(0, g.cache(null).metrics().getCacheHits()); - assertEquals(0, g.cache(null).metrics().getCacheMisses()); + assertEquals(0, jcache.metrics().getCacheGets()); + assertEquals(0, jcache.metrics().getCacheHits()); + assertEquals(0, jcache.metrics().getCacheMisses()); } } } @@@ -230,22 -234,24 +234,24 @@@ for (int j = 0; j < gridCount(); j++) { Ignite g = grid(j); - assertEquals(1, g.cache(null).metrics().getCachePuts()); + IgniteCache<Object, Object> jcache = g.jcache(null); + + assertEquals(1, jcache.metrics().getCachePuts()); - if (g.cache(null).affinity().isPrimary(g.cluster().localNode(), key)) { - assertEquals(1, g.cache(null).metrics().getCacheGets()); - assertEquals(0, g.cache(null).metrics().getCacheHits()); - assertEquals(1, g.cache(null).metrics().getCacheMisses()); + if (affinity(jcache).isPrimary(g.cluster().localNode(), key)) { + assertEquals(1, jcache.metrics().getCacheGets()); + assertEquals(0, jcache.metrics().getCacheHits()); + assertEquals(1, jcache.metrics().getCacheMisses()); } - else if (g.cache(null).affinity().isBackup(g.cluster().localNode(), key)){ - assertEquals(0, g.cache(null).metrics().getCacheGets()); - assertEquals(0, g.cache(null).metrics().getCacheHits()); - assertEquals(0, g.cache(null).metrics().getCacheMisses()); + else if (affinity(jcache).isBackup(g.cluster().localNode(), key)){ + assertEquals(0, jcache.metrics().getCacheGets()); + assertEquals(0, jcache.metrics().getCacheHits()); + assertEquals(0, jcache.metrics().getCacheMisses()); } else { - assertEquals(1, g.cache(null).metrics().getCacheGets()); - assertEquals(1, g.cache(null).metrics().getCacheHits()); - assertEquals(0, g.cache(null).metrics().getCacheMisses()); - assertEquals(2, jcache.metrics().getCacheGets()); ++ assertEquals(1, jcache.metrics().getCacheGets()); + assertEquals(1, jcache.metrics().getCacheHits()); - assertEquals(1, jcache.metrics().getCacheMisses()); ++ assertEquals(0, jcache.metrics().getCacheMisses()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java index 9bface4,9c87268..1a5b3f2 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java @@@ -188,7 -188,7 +188,7 @@@ public class GridCacheNearReadersSelfTe // Node 1 still has node2 in readers map. assertTrue(e1.readers().contains(n2.id())); - assertNotNull(cache1.put(1, "z1")); - assertNotNull((cache1.getAndPut(1, "z1"))); ++ assertNotNull(cache1.getAndPut(1, "z1")); // Node 1 still has node2 in readers map. assertFalse(e1.readers().contains(n2.id())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxPessimisticOriginatingNodeFailureSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 4179310,59e8bc4..01d35e7 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@@ -885,11 -1026,7 +1026,11 @@@ public abstract class IgniteCacheExpiry }, 3000); } - boolean primary = cache.entry(key).primary(); - boolean backup = cache.entry(key).backup(); - assertEquals("Unexpected ttl [node=" + i + ", key=" + key +']', ttl, e.ttl()); ++ boolean primary = cache.affinity().isPrimary(grid.localNode(), key); ++ boolean backup = cache.affinity().isBackup(grid.localNode(), key); + + assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e + + ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl()); if (ttl > 0) assertTrue(e.expireTime() > 0); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index bfe9a56,fc14ac9..1a10e30 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@@ -66,9 -67,12 +67,15 @@@ public class IgniteCacheTestSuite exten suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class); suite.addTestSuite(IgniteCrossCacheTxStoreSelfTest.class); + suite.addTestSuite(IgnitePutAllLargeBatchSelfTest.class); + suite.addTestSuite(IgnitePutAllUpdateNonPreloadedPartitionSelfTest.class); + + // User's class loader tests. + suite.addTestSuite(IgniteCacheAtomicExecutionContextTest.class); + suite.addTestSuite(IgniteCachePartitionedExecutionContextTest.class); + suite.addTestSuite(IgniteCacheReplicatedExecutionContextTest.class); + suite.addTestSuite(IgniteCacheTxExecutionContextTest.class); + // Affinity tests. suite.addTestSuite(GridCachePartitionFairAffinityNodesSelfTest.class); suite.addTestSuite(GridCacheAffinityBackupsSelfTest.class);