http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 17ca50c..d113cda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -100,8 +100,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte for (Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>> entry : futs.entrySet()) { final Object recipient = recipient(nodeId, entry.getKey()); - entry.getValue().listenAsync(new CIX1<IgniteFuture<QueryResult<K, V>>>() { - @Override public void applyx(IgniteFuture<QueryResult<K, V>> f) throws IgniteCheckedException { + entry.getValue().listenAsync(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() { + @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) throws IgniteCheckedException { f.get().closeIfNotShared(recipient); } }); @@ -114,8 +114,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte for (Map.Entry<Long, GridFutureAdapter<FieldsResult>> entry : fieldsFuts.entrySet()) { final Object recipient = recipient(nodeId, entry.getKey()); - entry.getValue().listenAsync(new CIX1<IgniteFuture<FieldsResult>>() { - @Override public void applyx(IgniteFuture<FieldsResult> f) + entry.getValue().listenAsync(new CIX1<IgniteInternalFuture<FieldsResult>>() { + @Override public void applyx(IgniteInternalFuture<FieldsResult> f) throws IgniteCheckedException { f.get().closeIfNotShared(recipient); } @@ -186,7 +186,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param valType Value type. * @return Future that will be completed when rebuilding of all indexes is finished. */ - public IgniteFuture<?> rebuildIndexes(Class<?> valType) { + public IgniteInternalFuture<?> rebuildIndexes(Class<?> valType) { return rebuildIndexes(valType.getName()); } @@ -196,7 +196,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param typeName Value type name. * @return Future that will be completed when rebuilding of all indexes is finished. */ - public IgniteFuture<?> rebuildIndexes(String typeName) { + public IgniteInternalFuture<?> rebuildIndexes(String typeName) { if (!enterBusy()) throw new IllegalStateException("Failed to rebuild indexes (grid is stopping)."); @@ -213,7 +213,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * * @return Future that will be completed when rebuilding of all indexes is finished. */ - public IgniteFuture<?> rebuildAllIndexes() { + public IgniteInternalFuture<?> rebuildAllIndexes() { if (!enterBusy()) throw new IllegalStateException("Failed to rebuild indexes (grid is stopping)."); @@ -1481,7 +1481,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs = qryIters.get(sndId); if (futs != null) { - IgniteFuture<QueryResult<K, V>> fut; + IgniteInternalFuture<QueryResult<K, V>> fut; synchronized (futs) { fut = futs.remove(reqId); @@ -1607,7 +1607,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Map<Long, GridFutureAdapter<FieldsResult>> futs = fieldsQryRes.get(sndId); if (futs != null) { - IgniteFuture<FieldsResult> fut; + IgniteInternalFuture<FieldsResult> fut; synchronized (futs) { fut = futs.remove(reqId); @@ -1701,14 +1701,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Collection<Collection<CacheSqlMetadata>> res = new ArrayList<>(nodes.size() + 1); - IgniteFuture<Collection<Collection<CacheSqlMetadata>>> rmtFut = null; + IgniteInternalFuture<Collection<Collection<CacheSqlMetadata>>> rmtFut = null; // Get metadata from remote nodes. if (!nodes.isEmpty()) rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST, F.asSet(job), nodes, true); // Get local metadata. - IgniteFuture<Collection<CacheSqlMetadata>> locFut = cctx.closures().callLocalSafe(job, true); + IgniteInternalFuture<Collection<CacheSqlMetadata>> locFut = cctx.closures().callLocalSafe(job, true); if (rmtFut != null) res.addAll(rmtFut.get()); @@ -2610,7 +2610,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<V> reloadAsync() { + @Override public IgniteInternalFuture<V> reloadAsync() { throw new UnsupportedOperationException(); } @@ -2652,7 +2652,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<V> getAsync() { + @Override public IgniteInternalFuture<V> getAsync() { return new GridFinishedFuture<V>(cctx.kernalContext(), getValue()); } @@ -2667,7 +2667,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<V> setAsync(V val, IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteInternalFuture<V> setAsync(V val, IgnitePredicate<CacheEntry<K, V>>... filter) { throw new UnsupportedOperationException(); } @@ -2677,7 +2677,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<V> setIfAbsentAsync(V val) { + @Override public IgniteInternalFuture<V> setIfAbsentAsync(V val) { throw new UnsupportedOperationException(); } @@ -2687,7 +2687,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> setxAsync(V val, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteInternalFuture<Boolean> setxAsync(V val, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { throw new UnsupportedOperationException(); } @@ -2697,7 +2697,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> setxIfAbsentAsync(V val) { + @Override public IgniteInternalFuture<Boolean> setxIfAbsentAsync(V val) { throw new UnsupportedOperationException(); } @@ -2707,7 +2707,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<V> replaceAsync(V val) { + @Override public IgniteInternalFuture<V> replaceAsync(V val) { throw new UnsupportedOperationException(); } @@ -2717,7 +2717,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> replacexAsync(V val) { + @Override public IgniteInternalFuture<Boolean> replacexAsync(V val) { throw new UnsupportedOperationException(); } @@ -2727,7 +2727,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> replaceAsync(V oldVal, V newVal) { + @Override public IgniteInternalFuture<Boolean> replaceAsync(V oldVal, V newVal) { throw new UnsupportedOperationException(); } @@ -2737,7 +2737,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<V> removeAsync(IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteInternalFuture<V> removeAsync(IgnitePredicate<CacheEntry<K, V>>... filter) { throw new UnsupportedOperationException(); } @@ -2747,7 +2747,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> removexAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteInternalFuture<Boolean> removexAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { throw new UnsupportedOperationException(); } @@ -2757,7 +2757,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> removeAsync(V val) { + @Override public IgniteInternalFuture<Boolean> removeAsync(V val) { throw new UnsupportedOperationException(); } @@ -2782,7 +2782,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> lockAsync(long timeout, + @Override public IgniteInternalFuture<Boolean> lockAsync(long timeout, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { throw new UnsupportedOperationException(); }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetadataAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetadataAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetadataAware.java index f6cc15d..e47f845 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetadataAware.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetadataAware.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.cache.query; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.query.*; -import org.apache.ignite.lang.*; import java.util.*; @@ -29,5 +29,5 @@ public interface GridCacheQueryMetadataAware { /** * @return Future to retrieve metadata. */ - public IgniteFuture<List<GridQueryFieldMetadata>> metadata(); + public IgniteInternalFuture<List<GridQueryFieldMetadata>> metadata(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java index f4084a2..5355cb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; @@ -290,7 +291,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<V> getAsync() { + @Override public IgniteInternalFuture<V> getAsync() { assert impl != null; ctx.denyOnFlag(LOCAL); @@ -308,7 +309,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<V> reloadAsync() { + @Override public IgniteInternalFuture<V> reloadAsync() { assert impl != null; ctx.denyOnFlag(READ); @@ -390,7 +391,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<V> setAsync(V val, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteInternalFuture<V> setAsync(V val, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { assert impl != null; ctx.denyOnFlag(READ); @@ -408,7 +409,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<V> setIfAbsentAsync(V val) { + @Override public IgniteInternalFuture<V> setIfAbsentAsync(V val) { assert impl != null; ctx.denyOnFlag(READ); @@ -427,7 +428,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> setxAsync(V val, + @Override public IgniteInternalFuture<Boolean> setxAsync(V val, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { assert impl != null; @@ -446,7 +447,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> setxIfAbsentAsync(V val) { + @Override public IgniteInternalFuture<Boolean> setxIfAbsentAsync(V val) { assert impl != null; ctx.denyOnFlag(READ); @@ -464,7 +465,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<V> replaceAsync(V val) { + @Override public IgniteInternalFuture<V> replaceAsync(V val) { assert impl != null; ctx.denyOnFlag(READ); @@ -482,7 +483,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> replacexAsync(V val) { + @Override public IgniteInternalFuture<Boolean> replacexAsync(V val) { assert impl != null; ctx.denyOnFlag(READ); @@ -500,7 +501,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> replaceAsync(V oldVal, V newVal) { + @Override public IgniteInternalFuture<Boolean> replaceAsync(V oldVal, V newVal) { assert impl != null; ctx.denyOnFlag(READ); @@ -519,7 +520,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<V> removeAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteInternalFuture<V> removeAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { assert impl != null; ctx.denyOnFlag(READ); @@ -537,7 +538,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> removexAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteInternalFuture<Boolean> removexAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { assert impl != null; ctx.denyOnFlag(READ); @@ -555,7 +556,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> removeAsync(V val) { + @Override public IgniteInternalFuture<Boolean> removeAsync(V val) { assert impl != null; ctx.denyOnFlag(READ); @@ -600,7 +601,7 @@ public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, Gr } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> lockAsync(long timeout, + @Override public IgniteInternalFuture<Boolean> lockAsync(long timeout, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { assert impl != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 32e11ea..de7416e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -221,7 +220,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { if (ctx.hasFlag(SYNC_COMMIT)) tx0.syncCommit(true); - IgniteFuture<?> lockFut = tx0.groupLockAsync(ctx, (Collection)F.asList(grpLockKey)); + IgniteInternalFuture<?> lockFut = tx0.groupLockAsync(ctx, (Collection)F.asList(grpLockKey)); try { lockFut.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index e079a5c..10843ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -557,7 +558,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public <R> IgniteFuture<R> future() { + @Override public <R> IgniteInternalFuture<R> future() { throw new UnsupportedOperationException("future() should not be called on IgniteTxAdapter directly."); } @@ -949,7 +950,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter /** {@inheritDoc} */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - @Override public IgniteFuture<IgniteTx> finishFuture() { + @Override public IgniteInternalFuture<IgniteTx> finishFuture() { GridFutureAdapter<IgniteTx> fut = finFut.get(); if (fut == null) { @@ -1569,7 +1570,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public <R> IgniteFuture<R> future() { + @Override public <R> IgniteInternalFuture<R> future() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java index c3961a5..63e4786 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.lang.*; @@ -382,7 +383,7 @@ public interface IgniteTxEx<K, V> extends IgniteTx, GridTimeoutObject { * * @return Future for prepare step. */ - public IgniteFuture<IgniteTxEx<K, V>> prepareAsync(); + public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync(); /** * @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) @@ -406,7 +407,7 @@ public interface IgniteTxEx<K, V> extends IgniteTx, GridTimeoutObject { /** * @return Future for transaction completion. */ - public IgniteFuture<IgniteTx> finishFuture(); + public IgniteInternalFuture<IgniteTx> finishFuture(); /** * @param state Transaction state. @@ -434,14 +435,14 @@ public interface IgniteTxEx<K, V> extends IgniteTx, GridTimeoutObject { * * @return Rollback future. */ - public IgniteFuture<IgniteTx> rollbackAsync(); + public IgniteInternalFuture<IgniteTx> rollbackAsync(); /** * Asynchronously commits this transaction by initiating {@code two-phase-commit} process. * * @return Future for commit operation. */ - public IgniteFuture<IgniteTx> commitAsync(); + public IgniteInternalFuture<IgniteTx> commitAsync(); /** * Callback invoked whenever there is a lock that has been acquired http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index d539f15..fb94cd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -52,7 +52,7 @@ public class IgniteTxHandler<K, V> { /** Shared cache context. */ private GridCacheSharedContext<K, V> ctx; - public IgniteFuture<IgniteTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId, + public IgniteInternalFuture<IgniteTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId, final GridNearTxPrepareRequest<K, V> req) { return prepareTx(nearNodeId, null, req); } @@ -148,7 +148,7 @@ public class IgniteTxHandler<K, V> { * @param req Near prepare request. * @return Future for transaction. */ - public IgniteFuture<IgniteTxEx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx, + public IgniteInternalFuture<IgniteTxEx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx, final GridNearTxPrepareRequest<K, V> req) { assert nearNodeId != null; assert req != null; @@ -174,20 +174,20 @@ public class IgniteTxHandler<K, V> { * @param req Near prepare request. * @return Prepare future. */ - private IgniteFuture<IgniteTxEx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx, + private IgniteInternalFuture<IgniteTxEx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx, final GridNearTxPrepareRequest<K, V> req) { - IgniteFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys. + IgniteInternalFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys. return new GridEmbeddedFuture<>( ctx.kernalContext(), fut, - new C2<Object, Exception, IgniteFuture<IgniteTxEx<K, V>>>() { - @Override public IgniteFuture<IgniteTxEx<K, V>> apply(Object o, Exception ex) { + new C2<Object, Exception, IgniteInternalFuture<IgniteTxEx<K, V>>>() { + @Override public IgniteInternalFuture<IgniteTxEx<K, V>> apply(Object o, Exception ex) { if (ex != null) throw new GridClosureException(ex); - IgniteFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(), + IgniteInternalFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(), req.transactionNodes(), req.last(), req.lastBackups()); if (locTx.isRollbackOnly()) @@ -220,7 +220,7 @@ public class IgniteTxHandler<K, V> { * @param req Near prepare request. * @return Prepare future. */ - private IgniteFuture<IgniteTxEx<K, V>> prepareNearTx(final UUID nearNodeId, + private IgniteInternalFuture<IgniteTxEx<K, V>> prepareNearTx(final UUID nearNodeId, final GridNearTxPrepareRequest<K, V> req) { ClusterNode nearNode = ctx.node(nearNodeId); @@ -284,7 +284,7 @@ public class IgniteTxHandler<K, V> { } if (tx != null) { - IgniteFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(), + IgniteInternalFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(), req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(), req.lastBackups()); @@ -299,8 +299,8 @@ public class IgniteTxHandler<K, V> { final GridDhtTxLocal<K, V> tx0 = tx; - fut.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() { - @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> txFut) { + fut.listenAsync(new CI1<IgniteInternalFuture<IgniteTxEx<K, V>>>() { + @Override public void apply(IgniteInternalFuture<IgniteTxEx<K, V>> txFut) { try { txFut.get(); } @@ -401,7 +401,7 @@ public class IgniteTxHandler<K, V> { * @param req Request. * @return Future. */ - @Nullable public IgniteFuture<IgniteTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest<K, V> req) { + @Nullable public IgniteInternalFuture<IgniteTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest<K, V> req) { return finish(nodeId, null, req); } @@ -410,7 +410,7 @@ public class IgniteTxHandler<K, V> { * @param req Request. * @return Future. */ - @Nullable public IgniteFuture<IgniteTx> finish(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx, + @Nullable public IgniteInternalFuture<IgniteTx> finish(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx, GridNearTxFinishRequest<K, V> req) { assert nodeId != null; assert req != null; @@ -422,12 +422,12 @@ public class IgniteTxHandler<K, V> { if (log.isDebugEnabled()) log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]"); - IgniteFuture<IgniteTx> colocatedFinishFut = null; + IgniteInternalFuture<IgniteTx> colocatedFinishFut = null; if (locTx != null && locTx.colocatedLocallyMapped()) colocatedFinishFut = finishColocatedLocal(req.commit(), locTx); - IgniteFuture<IgniteTx> nearFinishFut = null; + IgniteInternalFuture<IgniteTx> nearFinishFut = null; if (locTx == null || locTx.nearLocallyMapped()) { if (locTx != null) @@ -459,7 +459,7 @@ public class IgniteTxHandler<K, V> { * @param req Finish request. * @return Finish future. */ - private IgniteFuture<IgniteTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx, + private IgniteInternalFuture<IgniteTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx, GridNearTxFinishRequest<K, V> req) { GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version()); @@ -563,7 +563,7 @@ public class IgniteTxHandler<K, V> { if (tx.pessimistic()) tx.prepare(); - IgniteFuture<IgniteTx> commitFut = tx.commitAsync(); + IgniteInternalFuture<IgniteTx> commitFut = tx.commitAsync(); // Only for error logging. commitFut.listenAsync(CU.errorLogger(log)); @@ -579,7 +579,7 @@ public class IgniteTxHandler<K, V> { tx.nearFinishFutureId(req.futureId()); tx.nearFinishMiniId(req.miniId()); - IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync(); + IgniteInternalFuture<IgniteTx> rollbackFut = tx.rollbackAsync(); // Only for error logging. rollbackFut.listenAsync(CU.errorLogger(log)); @@ -591,7 +591,7 @@ public class IgniteTxHandler<K, V> { U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); if (tx != null) { - IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync(); + IgniteInternalFuture<IgniteTx> rollbackFut = tx.rollbackAsync(); // Only for error logging. rollbackFut.listenAsync(CU.errorLogger(log)); @@ -608,7 +608,7 @@ public class IgniteTxHandler<K, V> { * @param tx Transaction to commit. * @return Future. */ - public IgniteFuture<IgniteTx> finishColocatedLocal(boolean commit, GridNearTxLocal<K, V> tx) { + public IgniteInternalFuture<IgniteTx> finishColocatedLocal(boolean commit, GridNearTxLocal<K, V> tx) { try { if (commit) { if (!tx.markFinalizing(USER_FINISH)) { @@ -1448,10 +1448,10 @@ public class IgniteTxHandler<K, V> { if (log.isDebugEnabled()) log.debug("Processing check committed transaction request [nodeId=" + nodeId + ", req=" + req + ']'); - IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut = ctx.tm().checkPessimisticTxCommitted(req); + IgniteInternalFuture<GridCacheCommittedTxInfo<K, V>> infoFut = ctx.tm().checkPessimisticTxCommitted(req); - infoFut.listenAsync(new CI1<IgniteFuture<GridCacheCommittedTxInfo<K, V>>>() { - @Override public void apply(IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut) { + infoFut.listenAsync(new CI1<IgniteInternalFuture<GridCacheCommittedTxInfo<K, V>>>() { + @Override public void apply(IgniteInternalFuture<GridCacheCommittedTxInfo<K, V>> infoFut) { GridCacheCommittedTxInfo<K, V> info = null; try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 90e09d1..502c058 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -304,7 +305,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> loadMissing( + @Override public IgniteInternalFuture<Boolean> loadMissing( final GridCacheContext<K, V> cacheCtx, final boolean readThrough, boolean async, @@ -1332,7 +1333,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param filter Filter. * @return Loaded key-value pairs. */ - private IgniteFuture<Map<K, V>> checkMissed( + private IgniteInternalFuture<Map<K, V>> checkMissed( final GridCacheContext<K, V> cacheCtx, final Map<K, V> map, final Map<K, GridCacheVersion> missedMap, @@ -1517,7 +1518,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } /** {@inheritDoc} */ - @Override public IgniteFuture<Map<K, V>> getAllAsync( + @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( final GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys, @Nullable GridCacheEntryEx<K, V> cached, @@ -1563,7 +1564,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : -1L; - IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, @@ -1574,7 +1575,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> CU.<K, V>empty()); PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() { - @Override public IgniteFuture<Map<K, V>> postLock() throws IgniteCheckedException { + @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Acquired transaction lock for read on keys: " + lockKeys); @@ -1676,7 +1677,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (fut.isDone()) { try { - IgniteFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null); + IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null); return fut1.isDone() ? new GridFinishedFutureEx<>(finClos.apply(fut1.get(), null)) : @@ -1722,7 +1723,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, filter), // Closure that returns another future, based on result from first. new PMC<Map<K, V>>() { - @Override public IgniteFuture<Map<K, V>> postMiss(Map<K, V> map) { + @Override public IgniteInternalFuture<Map<K, V>> postMiss(Map<K, V> map) { if (redos.isEmpty()) return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap()); @@ -1769,7 +1770,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<GridCacheReturn<V>> putAllAsync( + @Override public IgniteInternalFuture<GridCacheReturn<V>> putAllAsync( GridCacheContext<K, V> cacheCtx, Map<? extends K, ? extends V> map, boolean retval, @@ -1777,7 +1778,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> long ttl, IgnitePredicate<CacheEntry<K, V>>[] filter ) { - return (IgniteFuture<GridCacheReturn<V>>)putAllAsync0(cacheCtx, + return (IgniteInternalFuture<GridCacheReturn<V>>)putAllAsync0(cacheCtx, map, null, null, @@ -1788,7 +1789,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } /** {@inheritDoc} */ - @Override public IgniteFuture<?> putAllDrAsync( + @Override public IgniteInternalFuture<?> putAllDrAsync( GridCacheContext<K, V> cacheCtx, Map<? extends K, GridCacheDrInfo<V>> drMap ) { @@ -1804,12 +1805,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync( + @Override public <T> IgniteInternalFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync( GridCacheContext<K, V> cacheCtx, @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map, Object... invokeArgs ) { - return (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx, + return (IgniteInternalFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx, null, map, invokeArgs, @@ -1820,7 +1821,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeAllDrAsync( + @Override public IgniteInternalFuture<?> removeAllDrAsync( GridCacheContext<K, V> cacheCtx, Map<? extends K, GridCacheVersion> drMap ) { @@ -1860,7 +1861,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param drRmvMap DR remove map (optional). * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions). */ - protected IgniteFuture<Set<K>> enlistWrite( + protected IgniteInternalFuture<Set<K>> enlistWrite( final GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys, @Nullable GridCacheEntryEx<K, V> cached, @@ -2070,7 +2071,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> // one key in the keys collection. assert keys.size() == 1; - IgniteFuture<Boolean> fut = loadMissing( + IgniteInternalFuture<Boolean> fut = loadMissing( cacheCtx, op == TRANSFORM || cacheCtx.loadPreviousValue(), true, @@ -2210,7 +2211,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> assert optimistic(); assert invokeMap != null; - IgniteFuture<Boolean> fut = loadMissing( + IgniteInternalFuture<Boolean> fut = loadMissing( cacheCtx, true, true, @@ -2439,7 +2440,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @return Operation future. */ @SuppressWarnings("unchecked") - private IgniteFuture putAllAsync0( + private IgniteInternalFuture putAllAsync0( final GridCacheContext<K, V> cacheCtx, @Nullable Map<? extends K, ? extends V> map, @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap, @@ -2545,7 +2546,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall(); - final IgniteFuture<Set<K>> loadFut = enlistWrite( + final IgniteInternalFuture<Set<K>> loadFut = enlistWrite( cacheCtx, keySet, cached, @@ -2582,7 +2583,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock for put on keys: " + keys); - IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys, + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys, lockTimeout(), this, false, @@ -2635,8 +2636,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> cctx.kernalContext()); } else { - return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() { - @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException { + return loadFut.chain(new CX1<IgniteInternalFuture<Set<K>>, GridCacheReturn<V>>() { + @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<Set<K>> f) throws IgniteCheckedException { f.get(); return ret; @@ -2652,7 +2653,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheReturn<V>> removeAllAsync( + @Override public IgniteInternalFuture<GridCacheReturn<V>> removeAllAsync( GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys, @Nullable GridCacheEntryEx<K, V> cached, @@ -2671,7 +2672,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param filter Filter. * @return Future for asynchronous remove. */ - private IgniteFuture<GridCacheReturn<V>> removeAllAsync0( + private IgniteInternalFuture<GridCacheReturn<V>> removeAllAsync0( final GridCacheContext<K, V> cacheCtx, @Nullable final Collection<? extends K> keys, @Nullable Map<? extends K, GridCacheVersion> drMap, @@ -2751,7 +2752,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> else plc = null; - final IgniteFuture<Set<K>> loadFut = enlistWrite( + final IgniteInternalFuture<Set<K>> loadFut = enlistWrite( cacheCtx, keys0, /** cached entry */null, @@ -2782,7 +2783,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys); - IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys, + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys, lockTimeout(), this, false, @@ -2835,8 +2836,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> cctx.kernalContext()); } else { - return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() { - @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException { + return loadFut.chain(new CX1<IgniteInternalFuture<Set<K>>, GridCacheReturn<V>>() { + @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<Set<K>> f) throws IgniteCheckedException { f.get(); return ret; @@ -2910,7 +2911,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * Performs keys locking for affinity-based group lock transactions. * @return Lock future. */ - @Override public IgniteFuture<?> groupLockAsync(GridCacheContext<K, V> cacheCtx, Collection<K> keys) { + @Override public IgniteInternalFuture<?> groupLockAsync(GridCacheContext<K, V> cacheCtx, Collection<K> keys) { assert groupLock(); try { @@ -3349,7 +3350,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * * @param <T> Return type. */ - protected abstract class PostLockClosure1<T> implements IgniteBiClosure<Boolean, Exception, IgniteFuture<T>> { + protected abstract class PostLockClosure1<T> implements IgniteBiClosure<Boolean, Exception, IgniteInternalFuture<T>> { /** */ private static final long serialVersionUID = 0L; @@ -3380,13 +3381,13 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } /** {@inheritDoc} */ - @Override public final IgniteFuture<T> apply(Boolean locked, @Nullable final Exception e) { + @Override public final IgniteInternalFuture<T> apply(Boolean locked, @Nullable final Exception e) { if (e != null) { setRollbackOnly(); if (commit && commitAfterLock()) - return rollbackAsync().chain(new C1<IgniteFuture<IgniteTx>, T>() { - @Override public T apply(IgniteFuture<IgniteTx> f) { + return rollbackAsync().chain(new C1<IgniteInternalFuture<IgniteTx>, T>() { + @Override public T apply(IgniteInternalFuture<IgniteTx> f) { throw new GridClosureException(e); } }); @@ -3402,8 +3403,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> ", tx=" + this + ']')); if (commit && commitAfterLock()) - return rollbackAsync().chain(new C1<IgniteFuture<IgniteTx>, T>() { - @Override public T apply(IgniteFuture<IgniteTx> f) { + return rollbackAsync().chain(new C1<IgniteInternalFuture<IgniteTx>, T>() { + @Override public T apply(IgniteInternalFuture<IgniteTx> f) { throw ex; } }); @@ -3420,8 +3421,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (commit && commitAfterLock()) { rollback = false; - return commitAsync().chain(new CX1<IgniteFuture<IgniteTx>, T>() { - @Override public T applyx(IgniteFuture<IgniteTx> f) throws IgniteCheckedException { + return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteTx>, T>() { + @Override public T applyx(IgniteInternalFuture<IgniteTx> f) throws IgniteCheckedException { f.get(); return r; @@ -3435,8 +3436,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } catch (final IgniteCheckedException ex) { if (commit && commitAfterLock()) - return rollbackAsync().chain(new C1<IgniteFuture<IgniteTx>, T>() { - @Override public T apply(IgniteFuture<IgniteTx> f) { + return rollbackAsync().chain(new C1<IgniteInternalFuture<IgniteTx>, T>() { + @Override public T apply(IgniteInternalFuture<IgniteTx> f) { throw new GridClosureException(ex); } }); @@ -3464,12 +3465,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * * @param <T> Return type. */ - protected abstract class PostLockClosure2<T> implements IgniteBiClosure<Boolean, Exception, IgniteFuture<T>> { + protected abstract class PostLockClosure2<T> implements IgniteBiClosure<Boolean, Exception, IgniteInternalFuture<T>> { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public final IgniteFuture<T> apply(Boolean locked, @Nullable Exception e) { + @Override public final IgniteInternalFuture<T> apply(Boolean locked, @Nullable Exception e) { boolean rollback = true; try { @@ -3480,7 +3481,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> throw new GridClosureException(new IgniteTxTimeoutException("Failed to acquire lock " + "within provided timeout for transaction [timeout=" + timeout() + ", tx=" + this + ']')); - IgniteFuture<T> fut = postLock(); + IgniteInternalFuture<T> fut = postLock(); rollback = false; @@ -3501,7 +3502,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @return Future return value. * @throws IgniteCheckedException If operation failed. */ - protected abstract IgniteFuture<T> postLock() throws IgniteCheckedException; + protected abstract IgniteInternalFuture<T> postLock() throws IgniteCheckedException; } /** @@ -3509,19 +3510,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * * @param <T> Return type. */ - protected abstract class PostMissClosure<T> implements IgniteBiClosure<T, Exception, IgniteFuture<T>> { + protected abstract class PostMissClosure<T> implements IgniteBiClosure<T, Exception, IgniteInternalFuture<T>> { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public final IgniteFuture<T> apply(T t, Exception e) { + @Override public final IgniteInternalFuture<T> apply(T t, Exception e) { boolean rollback = true; try { if (e != null) throw new GridClosureException(e); - IgniteFuture<T> fut = postMiss(t); + IgniteInternalFuture<T> fut = postMiss(t); rollback = false; @@ -3543,7 +3544,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @return Future return value. * @throws IgniteCheckedException If operation failed. */ - protected abstract IgniteFuture<T> postMiss(T t) throws IgniteCheckedException; + protected abstract IgniteInternalFuture<T> postMiss(T t) throws IgniteCheckedException; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 8568318..f7ace8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.lang.*; @@ -71,7 +72,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { * @param filter Entry filter. * @return Future for this get. */ - public IgniteFuture<Map<K, V>> getAllAsync( + public IgniteInternalFuture<Map<K, V>> getAllAsync( GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys, @Nullable GridCacheEntryEx<K, V> cached, @@ -87,7 +88,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { * @param ttl Time to live for entry. If negative, leave unchanged. * @return Future for put operation. */ - public IgniteFuture<GridCacheReturn<V>> putAllAsync( + public IgniteInternalFuture<GridCacheReturn<V>> putAllAsync( GridCacheContext<K, V> cacheCtx, Map<? extends K, ? extends V> map, boolean retval, @@ -101,7 +102,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { * @param invokeArgs Optional arguments for entry processor. * @return Transform operation future. */ - public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync( + public <T> IgniteInternalFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync( GridCacheContext<K, V> cacheCtx, Map<? extends K, ? extends EntryProcessor<K, V, Object>> map, Object... invokeArgs); @@ -114,7 +115,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { * @param filter Filter. * @return Future for asynchronous remove. */ - public IgniteFuture<GridCacheReturn<V>> removeAllAsync( + public IgniteInternalFuture<GridCacheReturn<V>> removeAllAsync( GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys, @Nullable GridCacheEntryEx<K, V> cached, @@ -126,7 +127,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { * @param drMap DR map to put. * @return Future for DR put operation. */ - public IgniteFuture<?> putAllDrAsync( + public IgniteInternalFuture<?> putAllDrAsync( GridCacheContext<K, V> cacheCtx, Map<? extends K, GridCacheDrInfo<V>> drMap); @@ -135,7 +136,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { * @param drMap DR map. * @return Future for asynchronous remove. */ - public IgniteFuture<?> removeAllDrAsync( + public IgniteInternalFuture<?> removeAllDrAsync( GridCacheContext<K, V> cacheCtx, Map<? extends K, GridCacheVersion> drMap); @@ -146,7 +147,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { * @param keys Keys to lock. * @return Lock future. */ - public IgniteFuture<?> groupLockAsync(GridCacheContext<K, V> cacheCtx, Collection<K> keys); + public IgniteInternalFuture<?> groupLockAsync(GridCacheContext<K, V> cacheCtx, Collection<K> keys); /** * @return {@code True} if keys from the same partition are allowed to be enlisted in group-lock transaction. @@ -171,7 +172,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { * @param deserializePortable Deserialize portable flag. * @return Future with {@code True} value if loading took place. */ - public IgniteFuture<Boolean> loadMissing( + public IgniteInternalFuture<Boolean> loadMissing( GridCacheContext<K, V> cacheCtx, boolean readThrough, boolean async, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c73a291..9cbd479 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -514,7 +515,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * @param topVer Topology version. * @return Future that will be completed when all ongoing transactions are finished. */ - public IgniteFuture<Boolean> finishTxs(long topVer) { + public IgniteInternalFuture<Boolean> finishTxs(long topVer) { GridCompoundFuture<IgniteTx, Boolean> res = new GridCompoundFuture<>(context().kernalContext(), new IgniteReducer<IgniteTx, Boolean>() { @@ -1488,7 +1489,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * @param threadId Near tx thread ID. * @return {@code null} if ack was received or future that will be completed when ack is received. */ - @Nullable public IgniteFuture<?> awaitFinishAckAsync(UUID rmtNodeId, long threadId) { + @Nullable public IgniteInternalFuture<?> awaitFinishAckAsync(UUID rmtNodeId, long threadId) { if (finishSyncDisabled) return null; @@ -1978,7 +1979,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * @param req Check committed request. * @return Check committed future. */ - public IgniteFuture<GridCacheCommittedTxInfo<K, V>> checkPessimisticTxCommitted(GridCachePessimisticCheckCommittedTxRequest req) { + public IgniteInternalFuture<GridCacheCommittedTxInfo<K, V>> checkPessimisticTxCommitted(GridCachePessimisticCheckCommittedTxRequest req) { // First check if we have near transaction with this ID. IgniteTxEx<K, V> tx = localTxForRecovery(req.nearXidVersion(), !req.nearOnlyCheck()); @@ -1992,8 +1993,8 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { final IgniteTxEx<K, V> tx0 = tx; - return tx.finishFuture().chain(new C1<IgniteFuture<IgniteTx>, GridCacheCommittedTxInfo<K, V>>() { - @Override public GridCacheCommittedTxInfo<K, V> apply(IgniteFuture<IgniteTx> txFut) { + return tx.finishFuture().chain(new C1<IgniteInternalFuture<IgniteTx>, GridCacheCommittedTxInfo<K, V>>() { + @Override public GridCacheCommittedTxInfo<K, V> apply(IgniteInternalFuture<IgniteTx> txFut) { GridCacheCommittedTxInfo<K, V> info = null; if (tx0.state() == COMMITTED) @@ -2189,7 +2190,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { /** * Commit listener. Checks if commit succeeded and rollbacks if case of error. */ - private class CommitListener implements CI1<IgniteFuture<IgniteTx>> { + private class CommitListener implements CI1<IgniteInternalFuture<IgniteTx>> { /** */ private static final long serialVersionUID = 0L; @@ -2204,7 +2205,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void apply(IgniteFuture<IgniteTx> t) { + @Override public void apply(IgniteInternalFuture<IgniteTx> t) { try { t.get(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java index 3bfebc8..14311d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; @@ -47,7 +48,7 @@ public class IgniteTxProxyImpl<K, V> implements IgniteTxProxy, Externalizable { private boolean async; /** Async call result. */ - private IgniteFuture asyncRes; + private IgniteInternalFuture asyncRes; /** * Empty constructor required for {@link Externalizable}. @@ -196,7 +197,7 @@ public class IgniteTxProxyImpl<K, V> implements IgniteTxProxy, Externalizable { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <R> IgniteFuture<R> future() { + @Override public <R> IgniteInternalFuture<R> future() { return asyncRes; } @@ -232,7 +233,7 @@ public class IgniteTxProxyImpl<K, V> implements IgniteTxProxy, Externalizable { enter(); try { - IgniteFuture<IgniteTx> commitFut = cctx.commitTxAsync(tx); + IgniteInternalFuture<IgniteTx> commitFut = cctx.commitTxAsync(tx); if (async) asyncRes = commitFut; @@ -261,7 +262,7 @@ public class IgniteTxProxyImpl<K, V> implements IgniteTxProxy, Externalizable { enter(); try { - IgniteFuture rollbackFut = cctx.rollbackTxAsync(tx); + IgniteInternalFuture rollbackFut = cctx.rollbackTxAsync(tx); if (async) asyncRes = rollbackFut; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index a6c26e9..aefff2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -124,7 +124,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Task execution future. */ - public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, + public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, @Nullable Collection<ClusterNode> nodes) { return runAsync(mode, jobs, nodes, false); } @@ -136,7 +136,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param sys If {@code true}, then system pool will be used. * @return Task execution future. */ - public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, + public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, @Nullable Collection<ClusterNode> nodes, boolean sys) { assert mode != null; @@ -164,7 +164,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Task execution future. */ - public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, + public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, @Nullable Collection<ClusterNode> nodes) { return runAsync(mode, job, nodes, false); } @@ -176,7 +176,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param sys If {@code true}, then system pool will be used. * @return Task execution future. */ - public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, + public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, @Nullable Collection<ClusterNode> nodes, boolean sys) { assert mode != null; @@ -304,7 +304,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R2> Type. * @return Reduced result. */ - public <R1, R2> IgniteFuture<R2> forkjoinAsync(GridClosureCallMode mode, + public <R1, R2> IgniteInternalFuture<R2> forkjoinAsync(GridClosureCallMode mode, @Nullable Collection<? extends Callable<R1>> jobs, @Nullable IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { assert mode != null; @@ -334,7 +334,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteFuture<Collection<R>> callAsync( + public <R> IgniteInternalFuture<Collection<R>> callAsync( GridClosureCallMode mode, @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes) { @@ -349,7 +349,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteFuture<Collection<R>> callAsync(GridClosureCallMode mode, + public <R> IgniteInternalFuture<Collection<R>> callAsync(GridClosureCallMode mode, @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes, boolean sys) { assert mode != null; @@ -380,7 +380,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteFuture<R> callAsync(GridClosureCallMode mode, + public <R> IgniteInternalFuture<R> callAsync(GridClosureCallMode mode, @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) { return callAsync(mode, job, nodes, false); } @@ -392,7 +392,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Job future. */ - public <R> IgniteFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job, + public <R> IgniteInternalFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job, @Nullable Collection<ClusterNode> nodes) { enterBusy(); @@ -422,7 +422,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Job future. */ - public IgniteFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job, + public IgniteInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job, @Nullable Collection<ClusterNode> nodes) { enterBusy(); @@ -453,7 +453,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteFuture<R> callAsyncNoFailover(GridClosureCallMode mode, @Nullable Callable<R> job, + public <R> IgniteInternalFuture<R> callAsyncNoFailover(GridClosureCallMode mode, @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes, boolean sys) { assert mode != null; @@ -484,7 +484,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteFuture<Collection<R>> callAsyncNoFailover(GridClosureCallMode mode, + public <R> IgniteInternalFuture<Collection<R>> callAsyncNoFailover(GridClosureCallMode mode, @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes, boolean sys) { assert mode != null; @@ -516,7 +516,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteFuture<R> callAsync(GridClosureCallMode mode, + public <R> IgniteInternalFuture<R> callAsync(GridClosureCallMode mode, @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes, boolean sys) { assert mode != null; @@ -544,7 +544,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Grid future for execution result. */ - public <T, R> IgniteFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg, + public <T, R> IgniteInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg, @Nullable Collection<ClusterNode> nodes) { enterBusy(); @@ -567,7 +567,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Grid future for execution result. */ - public <T, R> IgniteFuture<Collection<R>> broadcast(IgniteClosure<T, R> job, @Nullable T arg, + public <T, R> IgniteInternalFuture<Collection<R>> broadcast(IgniteClosure<T, R> job, @Nullable T arg, @Nullable Collection<ClusterNode> nodes) { enterBusy(); @@ -590,7 +590,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Grid future for execution result. */ - public <T, R> IgniteFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg, + public <T, R> IgniteInternalFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg, @Nullable Collection<ClusterNode> nodes) { enterBusy(); @@ -614,7 +614,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Grid future for execution result. */ - public <T, R> IgniteFuture<Collection<R>> callAsync(IgniteClosure<T, R> job, @Nullable Collection<? extends T> args, + public <T, R> IgniteInternalFuture<Collection<R>> callAsync(IgniteClosure<T, R> job, @Nullable Collection<? extends T> args, @Nullable Collection<ClusterNode> nodes) { enterBusy(); @@ -638,7 +638,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param nodes Grid nodes. * @return Grid future for execution result. */ - public <T, R1, R2> IgniteFuture<R2> callAsync(IgniteClosure<T, R1> job, + public <T, R1, R2> IgniteInternalFuture<R2> callAsync(IgniteClosure<T, R1> job, Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { enterBusy(); @@ -705,7 +705,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ - private IgniteFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException { + private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException { return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); } @@ -715,7 +715,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ - private IgniteFuture<?> runLocal(@Nullable final Runnable c, GridClosurePolicy plc) throws IgniteCheckedException { + private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, GridClosurePolicy plc) throws IgniteCheckedException { if (c == null) return new GridFinishedFuture(ctx); @@ -780,7 +780,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param c Closure to execute. * @return Future. */ - public IgniteFuture<?> runLocalSafe(Runnable c) { + public IgniteInternalFuture<?> runLocalSafe(Runnable c) { return runLocalSafe(c, true); } @@ -792,7 +792,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. * @return Future. */ - public IgniteFuture<?> runLocalSafe(Runnable c, boolean sys) { + public IgniteInternalFuture<?> runLocalSafe(Runnable c, boolean sys) { return runLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); } @@ -804,7 +804,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Policy to choose executor pool. * @return Future. */ - public IgniteFuture<?> runLocalSafe(Runnable c, GridClosurePolicy plc) { + public IgniteInternalFuture<?> runLocalSafe(Runnable c, GridClosurePolicy plc) { try { return runLocal(c, plc); } @@ -841,7 +841,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ - private <R> IgniteFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException { + private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException { return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); } @@ -852,7 +852,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ - private <R> IgniteFuture<R> callLocal(@Nullable final Callable<R> c, GridClosurePolicy plc) throws IgniteCheckedException { + private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, GridClosurePolicy plc) throws IgniteCheckedException { if (c == null) return new GridFinishedFuture<>(ctx); @@ -915,7 +915,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param c Closure to execute. * @return Future. */ - public <R> IgniteFuture<R> callLocalSafe(Callable<R> c) { + public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c) { return callLocalSafe(c, true); } @@ -927,7 +927,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. * @return Future. */ - public <R> IgniteFuture<R> callLocalSafe(Callable<R> c, boolean sys) { + public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, boolean sys) { return callLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); } @@ -939,7 +939,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Policy to choose executor pool. * @return Future. */ - public <R> IgniteFuture<R> callLocalSafe(Callable<R> c, GridClosurePolicy plc) { + public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, GridClosurePolicy plc) { try { return callLocal(c, plc); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index c349b55..4e84eac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -373,7 +373,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @return Future. */ @SuppressWarnings("TooBroadScope") - public IgniteFuture<UUID> startRoutine(GridContinuousHandler hnd, + public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd, int bufSize, long interval, boolean autoUnsubscribe, @@ -562,7 +562,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param routineId Consume ID. * @return Future. */ - public IgniteFuture<?> stopRoutine(UUID routineId) { + public IgniteInternalFuture<?> stopRoutine(UUID routineId) { assert routineId != null; boolean doStop = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5bef132/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java index 194d30d..9d3bf70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java @@ -22,7 +22,6 @@ import org.apache.ignite.dataload.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.thread.*; import org.apache.ignite.internal.managers.communication.*; @@ -153,8 +152,8 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { ldrs.add(ldr); - ldr.future().listenAsync(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> f) { + ldr.future().listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { boolean b = ldrs.remove(ldr); assert b : "Loader has not been added to set: " + ldr;