IGNITE-891 - Cache store improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/463883d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/463883d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/463883d2 Branch: refs/heads/ignite-sprint-5 Commit: 463883d289ed7decf4534c4788c8a092931a77b7 Parents: f57b6f9 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon May 18 17:49:44 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon May 18 17:49:44 2015 -0700 ---------------------------------------------------------------------- .../cache/GridCacheSharedContext.java | 30 +++++++++++++++----- .../cache/store/CacheStoreManager.java | 5 ++++ .../store/GridCacheStoreManagerAdapter.java | 7 ++++- .../transactions/IgniteTxLocalAdapter.java | 27 +++++++++++++++--- 4 files changed, 57 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/463883d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index dacd1aa..b16885e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.timeout.*; @@ -427,23 +428,38 @@ public class GridCacheSharedContext<K, V> { * @param tx Transaction to check. * @param activeCacheIds Active cache IDs. * @param cacheCtx Cache context. - * @return {@code True} if cross-cache transaction can include this new cache. + * @return Error message if transactions are incompatible. */ - public boolean txCompatible(IgniteInternalTx tx, Iterable<Integer> activeCacheIds, GridCacheContext<K, V> cacheCtx) { - if (cacheCtx.systemTx() ^ tx.system()) - return false; + @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, Iterable<Integer> activeCacheIds, + GridCacheContext<K, V> cacheCtx) { + if (cacheCtx.systemTx() && !tx.system()) + return "system cache can be enlisted only in system transaction"; + + if (!cacheCtx.systemTx() && tx.system()) + return "non-system cache can't be enlisted in system transaction"; for (Integer cacheId : activeCacheIds) { GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId); - // System transactions may sap only one cache. if (cacheCtx.systemTx()) { if (activeCacheCtx.cacheId() != cacheCtx.cacheId()) - return false; + return "system transaction can include only one cache"; } + + CacheStoreManager store = cacheCtx.store(); + CacheStoreManager activeStore = activeCacheCtx.store(); + + if (store.isLocal() != activeStore.isLocal()) + return "caches with local and non-local stores can't be enlisted in one transaction"; + + if (store.isWriteBehind() != activeStore.isWriteBehind()) + return "caches with different write-behind setting can't be enlisted in one transaction"; + + // If local and write-behind validations passed, this must be true. + assert store.isWriteToStoreFromDht() == activeStore.isWriteToStoreFromDht(); } - return true; + return null; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/463883d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java index 327b879..a14df6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java @@ -68,6 +68,11 @@ public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> { public boolean isWriteThrough(); /** + * @return {@code True} is write-behind is enabled. + */ + public boolean isWriteBehind(); + + /** * @return Whether DHT transaction can write to store from DHT. */ public boolean isWriteToStoreFromDht(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/463883d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index aeca58f..b608bb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -295,8 +295,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ + @Override public boolean isWriteBehind() { + return cctx.config().isWriteBehindEnabled(); + } + + /** {@inheritDoc} */ @Override public boolean isWriteToStoreFromDht() { - return cctx.config().isWriteBehindEnabled() || locStore; + return isWriteBehind() || locStore; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/463883d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 854448d..4d6a544 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -488,6 +488,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (stores == null || stores.isEmpty()) return; + assert isWriteToStoreFromDhtValid(stores) : "isWriteToStoreFromDht can't be different within one transaction"; + boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht(); if (near() || isWriteToStoreFromDht) { @@ -659,6 +661,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } + /** + * @param stores Store managers. + * @return If {@code isWriteToStoreFromDht} value same for all stores. + */ + private boolean isWriteToStoreFromDhtValid(Collection<CacheStoreManager> stores) { + boolean exp = F.first(stores).isWriteToStoreFromDht(); + + for (CacheStoreManager store : stores) { + if (store.isWriteToStoreFromDht() != exp) + return false; + } + + return true; + } + /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass"}) @Override public void userCommit() throws IgniteCheckedException { @@ -3017,7 +3034,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // Check if we can enlist new cache to transaction. if (!activeCacheIds.contains(cacheId)) { - if (!cctx.txCompatible(this, activeCacheIds, cacheCtx)) { + String err = cctx.verifyTxCompatibility(this, activeCacheIds, cacheCtx); + + if (err != null) { StringBuilder cacheNames = new StringBuilder(); int idx = 0; @@ -3029,9 +3048,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cacheNames.append(", "); } - throw new IgniteCheckedException("Failed to enlist new cache to existing transaction " + - "(cache configurations are not compatible) [" + - "activeCaches=[" + cacheNames + "]" + + throw new IgniteCheckedException("Failed to enlist new cache to existing transaction (" + + err + + ") [activeCaches=[" + cacheNames + "]" + ", cacheName=" + cacheCtx.name() + ", cacheSystem=" + cacheCtx.systemTx() + ", txSystem=" + system() + ']');