http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d57786e..8b5eaec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -18,10 +18,10 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; -import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; @@ -40,6 +40,7 @@ import org.apache.ignite.plugin.security.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; +import javax.cache.*; import javax.cache.expiry.*; import javax.cache.processor.*; import java.io.*; @@ -480,17 +481,26 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ @SuppressWarnings({"CatchGenericClass"}) protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException { - CacheStoreManager store = store(); + if (!storeEnabled() || internal()) + return; + + Collection<CacheStoreManager> stores = stores(); + + if (stores == null || stores.isEmpty()) + return; + + assert isWriteToStoreFromDhtValid(stores) : "isWriteToStoreFromDht can't be different within one transaction"; - if (store != null && store.isWriteThrough() && storeEnabled() && - !internal() && (near() || store.isWriteToStoreFromDht())) { + boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht(); + + if (near() || isWriteToStoreFromDht) { try { if (writeEntries != null) { Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null; List<Object> rmvCol = null; CacheStoreManager writeStore = null; - boolean skipNear = near() && store.isWriteToStoreFromDht(); + boolean skipNear = near() && isWriteToStoreFromDht; for (IgniteTxEntry e : writeEntries) { if ((skipNear && e.cached().isNear()) || e.skipStore()) @@ -524,11 +534,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } // Batch-process puts if cache ID has changed. - if (writeStore != null && writeStore != cacheCtx.store() && putMap != null && !putMap.isEmpty()) { - writeStore.putAll(this, putMap); + if (writeStore != null && writeStore != cacheCtx.store()) { + if (putMap != null && !putMap.isEmpty()) { + writeStore.putAll(this, putMap); - // Reset. - putMap.clear(); + // Reset. + putMap.clear(); + } writeStore = null; } @@ -544,12 +556,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal)); } - if (putMap == null) - putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); + if (writeStore == null) + writeStore = cacheCtx.store(); - putMap.put(CU.value(key, cacheCtx, false), F.t(CU.value(val, cacheCtx, false), ver)); + if (writeStore.isWriteThrough()) { + if (putMap == null) + putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); - writeStore = cacheCtx.store(); + putMap.put(CU.value(key, cacheCtx, false), F.t(CU.value(val, cacheCtx, false), ver)); + } } else if (op == DELETE) { // Batch-process all puts if needed. @@ -564,11 +579,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter writeStore = null; } - if (writeStore != null && writeStore != cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) { - writeStore.removeAll(this, rmvCol); + if (writeStore != null && writeStore != cacheCtx.store()) { + if (rmvCol != null && !rmvCol.isEmpty()) { + writeStore.removeAll(this, rmvCol); - // Reset. - rmvCol.clear(); + // Reset. + rmvCol.clear(); + } writeStore = null; } @@ -581,12 +598,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter continue; } - if (rmvCol == null) - rmvCol = new ArrayList<>(); + if (writeStore == null) + writeStore = cacheCtx.store(); - rmvCol.add(key.value(cacheCtx.cacheObjectContext(), false)); + if (writeStore.isWriteThrough()) { + if (rmvCol == null) + rmvCol = new ArrayList<>(); - writeStore = cacheCtx.store(); + rmvCol.add(key.value(cacheCtx.cacheObjectContext(), false)); + } } else if (log.isDebugEnabled()) log.debug("Ignoring NOOP entry for batch store commit: " + e); @@ -610,7 +630,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } // Commit while locks are held. - store.sessionEnd(this, true); + sessionEnd(stores, true); } catch (IgniteCheckedException ex) { commitError(ex); @@ -635,6 +655,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex); } + finally { + if (isRollbackOnly()) + sessionEnd(stores, false); + } } } @@ -972,24 +996,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().resetContext(); } } - else { - CacheStoreManager store = store(); - - if (store != null && !internal()) { - try { - store.sessionEnd(this, true); - } - catch (IgniteCheckedException e) { - commitError(e); - - setRollbackOnly(); - - cctx.tm().removeCommittedTx(this); - - throw e; - } - } - } // Do not unlock transaction entries if one-phase commit. if (!onePhaseCommit()) { @@ -1078,11 +1084,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().rollbackTx(this); - CacheStoreManager store = store(); + if (!internal()) { + Collection<CacheStoreManager> stores = stores(); + + if (stores != null && !stores.isEmpty()) { + assert isWriteToStoreFromDhtValid(stores) : + "isWriteToStoreFromDht can't be different within one transaction"; - if (store != null && (near() || store.isWriteToStoreFromDht())) { - if (!internal()) - store.sessionEnd(this, false); + boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht(); + + if (stores != null && !stores.isEmpty() && (near() || isWriteToStoreFromDht)) + sessionEnd(stores, false); + } } } catch (Error | IgniteCheckedException | RuntimeException e) { @@ -1094,6 +1107,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param stores Store managers. + * @param commit Commit flag. + * @throws IgniteCheckedException In case of error. + */ + private void sessionEnd(Collection<CacheStoreManager> stores, boolean commit) throws IgniteCheckedException { + Iterator<CacheStoreManager> it = stores.iterator(); + + while (it.hasNext()) { + CacheStoreManager store = it.next(); + + store.sessionEnd(this, commit, !it.hasNext()); + } + } + + /** * Checks if there is a cached or swapped value for * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method. * @@ -2507,6 +2535,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cacheCtx Cache context. + * @throws IgniteCheckedException If updates are not allowed. + */ + private void checkUpdatesAllowed(GridCacheContext cacheCtx) throws IgniteCheckedException { + if (!cacheCtx.updatesAllowed()) { + throw new IgniteTxRollbackCheckedException(new CacheException( + "Updates are not allowed for transactional cache: " + cacheCtx.name() + ". Configure " + + "persistence store on client or use remote closure execution to start transactions " + + "from server nodes.")); + } + } + + /** * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap} * maps must be non-null. * @@ -2533,6 +2574,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ) { assert filter == null || invokeMap == null; + try { + checkUpdatesAllowed(cacheCtx); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT); if (retval) @@ -2753,6 +2801,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter @Nullable GridCacheEntryEx cached, final boolean retval, @Nullable final CacheEntryPredicate[] filter) { + try { + checkUpdatesAllowed(cacheCtx); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE); if (retval) @@ -2983,7 +3038,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // Check if we can enlist new cache to transaction. if (!activeCacheIds.contains(cacheId)) { - if (!cctx.txCompatible(this, activeCacheIds, cacheCtx)) { + String err = cctx.verifyTxCompatibility(this, activeCacheIds, cacheCtx); + + if (err != null) { StringBuilder cacheNames = new StringBuilder(); int idx = 0; @@ -2995,9 +3052,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cacheNames.append(", "); } - throw new IgniteCheckedException("Failed to enlist new cache to existing transaction " + - "(cache configurations are not compatible) [" + - "activeCaches=[" + cacheNames + "]" + + throw new IgniteCheckedException("Failed to enlist new cache to existing transaction (" + + err + + ") [activeCaches=[" + cacheNames + "]" + ", cacheName=" + cacheCtx.name() + ", cacheSystem=" + cacheCtx.systemTx() + ", txSystem=" + system() + ']'); @@ -3284,6 +3341,23 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param stores Store managers. + * @return If {@code isWriteToStoreFromDht} value same for all stores. + */ + private boolean isWriteToStoreFromDhtValid(Collection<CacheStoreManager> stores) { + if (stores != null && !stores.isEmpty()) { + boolean exp = F.first(stores).isWriteToStoreFromDht(); + + for (CacheStoreManager store : stores) { + if (store.isWriteToStoreFromDht() != exp) + return false; + } + } + + return true; + } + + /** * Post-lock closure alias. * * @param <T> Return type.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 4666cca..b6c77f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1221,9 +1221,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { collectPendingVersions(dhtTxLoc); } - // 3.1 Call dataStructures manager. - cctx.kernalContext().dataStructures().onTxCommitted(tx); - // 4. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index a04692d..f8e5a60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -18,12 +18,10 @@ package org.apache.ignite.internal.processors.cacheobject; import org.apache.ignite.*; -import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; /** @@ -32,8 +30,9 @@ import org.jetbrains.annotations.*; public interface IgniteCacheObjectProcessor extends GridProcessor { /** * @see GridComponent#onKernalStart() + * @throws IgniteCheckedException If failed. */ - public void onCacheProcessorStarted(); + public void onUtilityCacheStarted() throws IgniteCheckedException; /** * @param typeName Type name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index fe5a356..45fc121 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -208,7 +208,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ - @Override public void onCacheProcessorStarted() { + @Override public void onUtilityCacheStarted() throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java new file mode 100644 index 0000000..91768a6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * + */ +public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage { + /** Routine ID. */ + protected final UUID routineId; + + /** Custom message ID. */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + + /** + * @param id Id. + */ + protected AbstractContinuousMessage(UUID id) { + routineId = id; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** + * @return Routine ID. + */ + public UUID routineId() { + return routineId; + } + + /** {@inheritDoc} */ + @Override public boolean incrementMinorTopologyVersion() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java index eb33613..1b79430 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java @@ -23,18 +23,6 @@ import org.jetbrains.annotations.*; * Continuous processor message types. */ enum GridContinuousMessageType { - /** Consume start request. */ - MSG_START_REQ, - - /** Consume start acknowledgement. */ - MSG_START_ACK, - - /** Consume stop request. */ - MSG_STOP_REQ, - - /** Consume stop acknowledgement. */ - MSG_STOP_ACK, - /** Remote event notification. */ MSG_EVT_NOTIFICATION, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 0d76ad4..38d970b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; @@ -58,50 +59,44 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Local infos. */ private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new ConcurrentHashMap8<>(); + /** Local infos. */ + private final ConcurrentMap<UUID, Map<UUID, LocalRoutineInfo>> clientInfos = new ConcurrentHashMap8<>(); + /** Remote infos. */ private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new ConcurrentHashMap8<>(); /** Start futures. */ private final ConcurrentMap<UUID, StartFuture> startFuts = new ConcurrentHashMap8<>(); - /** Start ack wait lists. */ - private final ConcurrentMap<UUID, Collection<UUID>> waitForStartAck = new ConcurrentHashMap8<>(); - /** Stop futures. */ private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap8<>(); - /** Stop ack wait lists. */ - private final ConcurrentMap<UUID, Collection<UUID>> waitForStopAck = new ConcurrentHashMap8<>(); - /** Threads started by this processor. */ private final Collection<IgniteThread> threads = new GridConcurrentHashSet<>(); - /** Pending start requests. */ - private final Map<UUID, Collection<GridContinuousMessage>> pending = new HashMap<>(); - /** */ private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>(); /** Stopped IDs. */ private final Collection<UUID> stopped = new HashSet<>(); - /** Lock for pending requests. */ - private final Lock pendingLock = new ReentrantLock(); - /** Lock for stop process. */ private final Lock stopLock = new ReentrantLock(); + /** Marshaller. */ + private Marshaller marsh; + /** Delay in milliseconds between retries. */ private long retryDelay = 1000; /** Number of retries using to send messages. */ private int retryCnt = 3; - /** Acknowledgement timeout. */ - private long ackTimeout; + /** */ + private final ReentrantReadWriteLock processorStopLock = new ReentrantReadWriteLock(); - /** Marshaller. */ - private Marshaller marsh; + /** */ + private boolean processorStopped; /** * @param ctx Kernal context. @@ -117,15 +112,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { retryDelay = ctx.config().getNetworkSendRetryDelay(); retryCnt = ctx.config().getNetworkSendRetryCount(); - ackTimeout = ctx.config().getNetworkTimeout(); - - if (ackTimeout < retryDelay * retryCnt) { - U.warn(log, "Acknowledgement timeout for continuous operations is less than message send " + - "retry delay multiplied by retries count (will increase timeout value) [ackTimeout=" + - ackTimeout + ", retryDelay=" + retryDelay + ", retryCnt=" + retryCnt + ']'); - - ackTimeout = retryDelay * retryCnt; - } marsh = ctx.config().getMarshaller(); @@ -133,114 +119,112 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @SuppressWarnings({"fallthrough", "TooBroadScope"}) @Override public void onEvent(Event evt) { assert evt instanceof DiscoveryEvent; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - Collection<GridContinuousMessage> reqs; + clientInfos.remove(nodeId); - pendingLock.lock(); + // Unregister handlers created by left node. + for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { + UUID routineId = e.getKey(); + RemoteRoutineInfo info = e.getValue(); - try { - // Remove pending requests to send to joined node - // (if node is left or failed, they are dropped). - reqs = pending.remove(nodeId); - } - finally { - pendingLock.unlock(); + if (info.autoUnsubscribe && nodeId.equals(info.nodeId)) + unregisterRemote(routineId); } - switch (evt.type()) { - case EVT_NODE_JOINED: - if (reqs != null) { - UUID routineId = null; + for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { + SyncMessageAckFuture fut = e.getValue(); - // Send pending requests. - try { - for (GridContinuousMessage req : reqs) { - routineId = req.routineId(); + if (fut.nodeId().equals(nodeId)) { + SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); - sendWithRetries(nodeId, req, null); - } - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send pending start request to node (is node alive?): " + - nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send pending start request to node: " + nodeId, e); + if (fut0 != null) { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( + "Node left grid while sending message to: " + nodeId); - completeStartFuture(routineId); - } + fut0.onDone(err); } + } + } + } + }, EVT_NODE_LEFT, EVT_NODE_FAILED); - break; + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) { + StartFuture fut = itr.next(); - case EVT_NODE_LEFT: - case EVT_NODE_FAILED: - // Do not wait for start acknowledgements from left node. - for (Map.Entry<UUID, Collection<UUID>> e : waitForStartAck.entrySet()) { - Collection<UUID> nodeIds = e.getValue(); + itr.remove(); - for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) { - if (nodeId.equals(it.next())) { - it.remove(); + fut.onDone(new IgniteException("Topology segmented")); + } - break; - } - } + for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) { + StopFuture fut = itr.next(); - if (nodeIds.isEmpty()) - completeStartFuture(e.getKey()); - } + itr.remove(); - // Do not wait for stop acknowledgements from left node. - for (Map.Entry<UUID, Collection<UUID>> e : waitForStopAck.entrySet()) { - Collection<UUID> nodeIds = e.getValue(); + fut.onDone(new IgniteException("Topology segmented")); + } + } + }, EVT_NODE_SEGMENTED); - for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) { - if (nodeId.equals(it.next())) { - it.remove(); + ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class, + new CustomEventListener<StartRoutineDiscoveryMessage>() { + @Override public void onCustomEvent(ClusterNode snd, StartRoutineDiscoveryMessage msg) { + if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping()) + processStartRequest(snd, msg); + } + }); - break; - } - } + ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, + new CustomEventListener<StartRoutineAckDiscoveryMessage>() { + @Override public void onCustomEvent(ClusterNode snd, StartRoutineAckDiscoveryMessage msg) { + StartFuture fut = startFuts.remove(msg.routineId()); - if (nodeIds.isEmpty()) - completeStopFuture(e.getKey()); - } + if (fut != null) { + if (msg.errs().isEmpty()) + fut.onRemoteRegistered(); + else { + IgniteCheckedException firstEx = F.first(msg.errs().values()); - // Unregister handlers created by left node. - for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { - UUID routineId = e.getKey(); - RemoteRoutineInfo info = e.getValue(); + fut.onDone(firstEx); - if (info.autoUnsubscribe && nodeId.equals(info.nodeId)) - unregisterRemote(routineId); + stopRoutine(msg.routineId()); } + } + } + }); - for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { - SyncMessageAckFuture fut = e.getValue(); + ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class, + new CustomEventListener<StopRoutineDiscoveryMessage>() { + @Override public void onCustomEvent(ClusterNode snd, StopRoutineDiscoveryMessage msg) { + if (!snd.id().equals(ctx.localNodeId())) { + UUID routineId = msg.routineId(); - if (fut.nodeId().equals(nodeId)) { - SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); + unregisterRemote(routineId); - if (fut0 != null) { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( - "Node left grid while sending message to: " + nodeId); + if (snd.isClient()) { + Map<UUID, LocalRoutineInfo> infoMap = clientInfos.get(snd.id()); - fut0.onDone(err); - } - } + if (infoMap != null) + infoMap.remove(msg.routineId()); } + } + } + }); - break; + ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class, + new CustomEventListener<StopRoutineAckDiscoveryMessage>() { + @Override public void onCustomEvent(ClusterNode snd, StopRoutineAckDiscoveryMessage msg) { + StopFuture fut = stopFuts.remove(msg.routineId()); - default: - assert false : "Unexpected event received: " + evt.shortDisplay(); + if (fut != null) + fut.onDone(); } - } - }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); + }); ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { @Override public void onMessage(UUID nodeId, Object obj) { @@ -258,26 +242,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } switch (msg.type()) { - case MSG_START_REQ: - processStartRequest(nodeId, msg); - - break; - - case MSG_START_ACK: - processStartAck(nodeId, msg); - - break; - - case MSG_STOP_REQ: - processStopRequest(nodeId, msg); - - break; - - case MSG_STOP_ACK: - processStopAck(nodeId, msg); - - break; - case MSG_EVT_NOTIFICATION: processNotification(nodeId, msg); @@ -298,6 +262,41 @@ public class GridContinuousProcessor extends GridProcessorAdapter { log.debug("Continuous processor started."); } + /** + * @return {@code true} if lock successful, {@code false} if processor already stopped. + */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + public boolean lockStopping() { + processorStopLock.readLock().lock(); + + if (processorStopped) { + processorStopLock.readLock().unlock(); + + return false; + } + + return true; + } + + /** + * + */ + public void unlockStopping() { + processorStopLock.readLock().unlock(); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + processorStopLock.writeLock().lock(); + + try { + processorStopped = true; + } + finally { + processorStopLock.writeLock().unlock(); + } + } + /** {@inheritDoc} */ @Override public void stop(boolean cancel) throws IgniteCheckedException { if (ctx.config().isDaemon()) @@ -320,29 +319,19 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { if (!nodeId.equals(ctx.localNodeId())) { - pendingLock.lock(); + DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos); - try { - // Create empty pending set. - pending.put(nodeId, new HashSet<GridContinuousMessage>()); - - DiscoveryData data = new DiscoveryData(ctx.localNodeId()); - - // Collect listeners information (will be sent to - // joining node during discovery process). - for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { - UUID routineId = e.getKey(); - LocalRoutineInfo info = e.getValue(); - - data.addItem(new DiscoveryDataItem(routineId, info.prjPred, - info.hnd, info.bufSize, info.interval)); - } + // Collect listeners information (will be sent to + // joining node during discovery process). + for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); - return data; - } - finally { - pendingLock.unlock(); + data.addItem(new DiscoveryDataItem(routineId, info.prjPred, + info.hnd, info.bufSize, info.interval)); } + + return data; } else return null; @@ -366,6 +355,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } } + + for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) { + Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey()); + + if (map == null) { + map = new HashMap<>(); + + clientInfos.put(entry.getKey(), map); + } + + map.putAll(entry.getValue()); + } } } @@ -447,14 +448,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (dep == null) throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + prjPred); - reqData.clsName = clsName; - reqData.depInfo = new GridDeploymentInfoBean(dep); + reqData.className(clsName); + reqData.deploymentInfo(new GridDeploymentInfoBean(dep)); reqData.p2pMarshal(marsh); } // Handle peer deployment for other handler-specific objects. - reqData.hnd.p2pMarshal(ctx); + reqData.handler().p2pMarshal(ctx); } } catch (IgniteCheckedException e) { @@ -486,103 +487,24 @@ public class GridContinuousProcessor extends GridProcessorAdapter { }); } - Collection<? extends ClusterNode> nodes; - Collection<UUID> nodeIds; - - pendingLock.lock(); - - try { - // Nodes that participate in routine (request will be sent to these nodes directly). - nodes = F.view(ctx.discovery().allNodes(), F.and(prjPred, F.remoteNodes(ctx.localNodeId()))); - - // Stop with exception if projection is empty. - if (nodes.isEmpty() && !locIncluded) { - return new GridFinishedFuture<>( - new ClusterTopologyCheckedException("Failed to register remote continuous listener (projection is empty).")); - } - - // IDs of nodes where request will be sent. - nodeIds = new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id())); - - // If there are currently joining nodes, add request to their pending lists. - // Node IDs set is updated to make sure that we wait for acknowledgement from - // these nodes. - for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : pending.entrySet()) { - if (nodeIds.add(e.getKey())) - e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false)); - } - - // Register routine locally. - locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval)); - } - finally { - pendingLock.unlock(); - } + // Register routine locally. + locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval)); StartFuture fut = new StartFuture(ctx, routineId); - if (!nodeIds.isEmpty()) { - // Wait for acknowledgements. - waitForStartAck.put(routineId, nodeIds); - - startFuts.put(routineId, fut); - - // Register acknowledge timeout (timeout object will be removed when - // future is completed). - fut.addTimeoutObject(new GridTimeoutObjectAdapter(ackTimeout) { - @Override public void onTimeout() { - // Stop waiting for acknowledgements. - Collection<UUID> ids = waitForStartAck.remove(routineId); + startFuts.put(routineId, fut); - if (ids != null) { - StartFuture f = startFuts.remove(routineId); - - assert f != null; - - // If there are still nodes without acknowledgements, - // Stop routine with exception. Continue and complete - // future otherwise. - if (!ids.isEmpty()) { - f.onDone(new IgniteCheckedException("Failed to get start acknowledgement from nodes (timeout " + - "expired): " + ids + ". Will unregister all continuous listeners.")); - - stopRoutine(routineId); - } - else - f.onRemoteRegistered(); - } - } - }); + try { + ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData)); } + catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter. + startFuts.remove(routineId); - if (!nodes.isEmpty()) { - // Do not send projection predicate (nodes already filtered). - reqData.prjPred = null; - reqData.prjPredBytes = null; - - // Send start requests. - try { - GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false); + locInfos.remove(routineId); - sendWithRetries(nodes, req, null); - } - catch (IgniteCheckedException e) { - startFuts.remove(routineId); - waitForStartAck.remove(routineId); + fut.onDone(e); - fut.onDone(e); - - stopRoutine(routineId); - - locIncluded = false; - } - } - else { - // There are no remote nodes, but we didn't throw topology exception. - assert locIncluded; - - // Do not wait anything from remote nodes. - fut.onRemoteRegistered(); + return fut; } // Register local handler if needed. @@ -640,61 +562,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Unregister handler locally. unregisterHandler(routineId, routine.hnd, true); - pendingLock.lock(); - - try { - // Remove pending requests for this routine. - for (Collection<GridContinuousMessage> msgs : pending.values()) { - Iterator<GridContinuousMessage> it = msgs.iterator(); - - while (it.hasNext()) { - if (it.next().routineId().equals(routineId)) - it.remove(); - } - } - } - finally { - pendingLock.unlock(); - } - - // Nodes where to send stop requests. - Collection<? extends ClusterNode> nodes = F.view(ctx.discovery().allNodes(), - F.and(routine.prjPred, F.remoteNodes(ctx.localNodeId()))); - - if (!nodes.isEmpty()) { - // Wait for acknowledgements. - waitForStopAck.put(routineId, new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id()))); - - // Register acknowledge timeout (timeout object will be removed when - // future is completed). - fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId, - new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false))); - - // Send stop requests. - try { - for (ClusterNode node : nodes) { - try { - sendWithRetries(node.id(), - new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false), - null); - } - catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send stop request (node left topology): " + node.id()); - } - } - } - catch (IgniteCheckedException e) { - stopFuts.remove(routineId); - waitForStopAck.remove(routineId); - - fut.onDone(e); - } - } - else { - stopFuts.remove(routineId); + ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); + if (ctx.isStopping()) fut.onDone(); - } } return fut; @@ -721,13 +592,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter { assert !nodeId.equals(ctx.localNodeId()); + if (processorStopped) + return; + RemoteRoutineInfo info = rmtInfos.get(routineId); if (info != null) { assert info.interval == 0 || !sync; if (sync) { - SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, nodeId); + SyncMessageAckFuture fut = new SyncMessageAckFuture(nodeId); IgniteUuid futId = IgniteUuid.randomUuid(); @@ -779,29 +653,26 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** - * @param nodeId Sender ID. + * @param node Sender. * @param req Start request. */ - private void processStartRequest(UUID nodeId, GridContinuousMessage req) { - assert nodeId != null; - assert req != null; - + private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) { UUID routineId = req.routineId(); - StartRequestData data = req.data(); + StartRequestData data = req.startRequestData(); - GridContinuousHandler hnd = data.hnd; + GridContinuousHandler hnd = data.handler(); IgniteCheckedException err = null; try { if (ctx.config().isPeerClassLoadingEnabled()) { - String clsName = data.clsName; + String clsName = data.className(); if (clsName != null) { - GridDeploymentInfo depInfo = data.depInfo; + GridDeploymentInfo depInfo = data.deploymentInfo(); GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, - depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); + depInfo.userVersion(), node.id(), depInfo.classLoaderId(), depInfo.participants(), null); if (dep == null) throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); @@ -809,127 +680,58 @@ public class GridContinuousProcessor extends GridProcessorAdapter { data.p2pUnmarshal(marsh, dep.classLoader()); } - hnd.p2pUnmarshal(nodeId, ctx); + hnd.p2pUnmarshal(node.id(), ctx); } } catch (IgniteCheckedException e) { err = e; - U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e); + U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); + } + + if (node.isClient()) { + Map<UUID, LocalRoutineInfo> clientRouteMap = clientInfos.get(node.id()); + + if (clientRouteMap == null) { + clientRouteMap = new HashMap<>(); + + Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), clientRouteMap); + + assert old == null; + } + + clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(), + data.interval())); } boolean registered = false; if (err == null) { try { - IgnitePredicate<ClusterNode> prjPred = data.prjPred; + IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate(); + + ctx.resource().injectGeneric(prjPred); if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) { - registered = registerHandler(nodeId, routineId, hnd, data.bufSize, data.interval, - data.autoUnsubscribe, false); + registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(), + data.autoUnsubscribe(), false); } } catch (IgniteCheckedException e) { err = e; - U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e); + U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); } } - try { - sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err, false), null); - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send start acknowledgement to node (is node alive?): " + nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send start acknowledgement to node: " + nodeId, e); - } + if (err != null) + req.addError(ctx.localNodeId(), err); if (registered) hnd.onListenerRegistered(routineId, ctx); } /** - * @param nodeId Sender ID. - * @param ack Start acknowledgement. - */ - private void processStartAck(UUID nodeId, GridContinuousMessage ack) { - assert nodeId != null; - assert ack != null; - - UUID routineId = ack.routineId(); - - final IgniteCheckedException err = ack.data(); - - if (err != null) { - if (waitForStartAck.remove(routineId) != null) { - final StartFuture fut = startFuts.remove(routineId); - - if (fut != null) { - fut.onDone(err); - - stopRoutine(routineId); - } - } - } - - Collection<UUID> nodeIds = waitForStartAck.get(routineId); - - if (nodeIds != null) { - nodeIds.remove(nodeId); - - if (nodeIds.isEmpty()) - completeStartFuture(routineId); - } - } - - /** - * @param nodeId Sender ID. - * @param req Stop request. - */ - private void processStopRequest(UUID nodeId, GridContinuousMessage req) { - assert nodeId != null; - assert req != null; - - UUID routineId = req.routineId(); - - unregisterRemote(routineId); - - try { - sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null, false), null); - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send stop acknowledgement to node (is node alive?): " + nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send stop acknowledgement to node: " + nodeId, e); - } - } - - /** - * @param nodeId Sender ID. - * @param ack Stop acknowledgement. - */ - private void processStopAck(UUID nodeId, GridContinuousMessage ack) { - assert nodeId != null; - assert ack != null; - - UUID routineId = ack.routineId(); - - Collection<UUID> nodeIds = waitForStopAck.get(routineId); - - if (nodeIds != null) { - nodeIds.remove(nodeId); - - if (nodeIds.isEmpty()) - completeStopFuture(routineId); - } - } - - /** * @param msg Message. */ private void processMessageAck(GridContinuousMessage msg) { @@ -972,36 +774,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** - * @param routineId Consume ID. - */ - private void completeStartFuture(UUID routineId) { - assert routineId != null; - - if (waitForStartAck.remove(routineId) != null) { - StartFuture fut = startFuts.remove(routineId); - - assert fut != null; - - fut.onRemoteRegistered(); - } - } - - /** - * @param routineId Consume ID. - */ - private void completeStopFuture(UUID routineId) { - assert routineId != null; - - if (waitForStopAck.remove(routineId) != null) { - GridFutureAdapter <?> fut = stopFuts.remove(routineId); - - assert fut != null; - - fut.onDone(); - } - } - - /** * @param nodeId Node ID. * @param routineId Consume ID. * @param hnd Handler. @@ -1231,7 +1003,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * Local routine info. */ @SuppressWarnings("PackageVisibleInnerClass") - static class LocalRoutineInfo { + static class LocalRoutineInfo implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + /** Projection predicate. */ private final IgnitePredicate<ClusterNode> prjPred; @@ -1430,133 +1205,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** - * Start request data. - */ - private static class StartRequestData implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Projection predicate. */ - private IgnitePredicate<ClusterNode> prjPred; - - /** Serialized projection predicate. */ - private byte[] prjPredBytes; - - /** Deployment class name. */ - private String clsName; - - /** Deployment info. */ - private GridDeploymentInfo depInfo; - - /** Handler. */ - private GridContinuousHandler hnd; - - /** Buffer size. */ - private int bufSize; - - /** Time interval. */ - private long interval; - - /** Automatic unsubscribe flag. */ - private boolean autoUnsubscribe; - - /** - * Required by {@link Externalizable}. - */ - public StartRequestData() { - // No-op. - } - - /** - * @param prjPred Serialized projection predicate. - * @param hnd Handler. - * @param bufSize Buffer size. - * @param interval Time interval. - * @param autoUnsubscribe Automatic unsubscribe flag. - */ - StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, - int bufSize, long interval, boolean autoUnsubscribe) { - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - this.prjPred = prjPred; - this.hnd = hnd; - this.bufSize = bufSize; - this.interval = interval; - this.autoUnsubscribe = autoUnsubscribe; - } - - /** - * @param marsh Marshaller. - * @throws IgniteCheckedException In case of error. - */ - void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { - assert marsh != null; - - prjPredBytes = marsh.marshal(prjPred); - } - - /** - * @param marsh Marshaller. - * @param ldr Class loader. - * @throws IgniteCheckedException In case of error. - */ - void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; - - assert prjPred == null; - assert prjPredBytes != null; - - prjPred = marsh.unmarshal(prjPredBytes, ldr); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - boolean b = prjPredBytes != null; - - out.writeBoolean(b); - - if (b) { - U.writeByteArray(out, prjPredBytes); - U.writeString(out, clsName); - out.writeObject(depInfo); - } - else - out.writeObject(prjPred); - - out.writeObject(hnd); - out.writeInt(bufSize); - out.writeLong(interval); - out.writeBoolean(autoUnsubscribe); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - boolean b = in.readBoolean(); - - if (b) { - prjPredBytes = U.readByteArray(in); - clsName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - else - prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); - - hnd = (GridContinuousHandler)in.readObject(); - bufSize = in.readInt(); - interval = in.readLong(); - autoUnsubscribe = in.readBoolean(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StartRequestData.class, this); - } - } - - /** * Discovery data. */ private static class DiscoveryData implements Externalizable { @@ -1570,6 +1218,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @GridToStringInclude private Collection<DiscoveryDataItem> items; + private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos; + /** * Required by {@link Externalizable}. */ @@ -1580,11 +1230,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param nodeId Node ID. */ - DiscoveryData(UUID nodeId) { + DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) { assert nodeId != null; this.nodeId = nodeId; + this.clientInfos = clientInfos; + items = new ArrayList<>(); } @@ -1599,12 +1251,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeUuid(out, nodeId); U.writeCollection(out, items); + U.writeMap(out, clientInfos); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { nodeId = U.readUuid(in); items = U.readCollection(in); + clientInfos = U.readMap(in); } /** {@inheritDoc} */ @@ -1716,13 +1370,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private volatile GridTimeoutObject timeoutObj; /** - * Required by {@link Externalizable}. - */ - public StartFuture() { - // No-op. - } - - /** * @param ctx Kernal context. * @param routineId Consume ID. */ @@ -1833,10 +1480,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private UUID nodeId; /** - * @param ctx Kernal context. * @param nodeId Master node ID. */ - SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) { + SyncMessageAckFuture(UUID nodeId) { this.nodeId = nodeId; } @@ -1852,76 +1498,4 @@ public class GridContinuousProcessor extends GridProcessorAdapter { return S.toString(SyncMessageAckFuture.class, this); } } - - /** - * Timeout object for stop process. - */ - private class StopTimeoutObject extends GridTimeoutObjectAdapter { - /** Timeout. */ - private final long timeout; - - /** Routine ID. */ - private final UUID routineId; - - /** Request. */ - private final GridContinuousMessage req; - - /** - * @param timeout Timeout. - * @param routineId Routine ID. - * @param req Request. - */ - protected StopTimeoutObject(long timeout, UUID routineId, GridContinuousMessage req) { - super(timeout); - - assert routineId != null; - assert req != null; - - this.timeout = timeout; - this.routineId = routineId; - this.req = req; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - Collection<UUID> ids = waitForStopAck.remove(routineId); - - if (ids != null) { - U.warn(log, "Failed to get stop acknowledgement from nodes (timeout expired): " + ids + - ". Will retry."); - - StopFuture f = stopFuts.get(routineId); - - if (f != null) { - if (!ids.isEmpty()) { - waitForStopAck.put(routineId, ids); - - // Resend requests. - for (UUID id : ids) { - try { - sendWithRetries(id, req, null); - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to resend stop request to node (is node alive?): " + id); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to resend stop request to node: " + id, e); - - ids.remove(id); - - if (ids.isEmpty()) - f.onDone(e); - } - } - - // Reschedule timeout. - ctx.timeout().addTimeoutObject(new StopTimeoutObject(timeout, routineId, req)); - } - else if (stopFuts.remove(routineId) != null) - f.onDone(); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java new file mode 100644 index 0000000..c721d44 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Start request data. + */ +class StartRequestData implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Projection predicate. */ + private IgnitePredicate<ClusterNode> prjPred; + + /** Serialized projection predicate. */ + private byte[] prjPredBytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** Handler. */ + private GridContinuousHandler hnd; + + /** Buffer size. */ + private int bufSize; + + /** Time interval. */ + private long interval; + + /** Automatic unsubscribe flag. */ + private boolean autoUnsubscribe; + + /** + * Required by {@link java.io.Externalizable}. + */ + public StartRequestData() { + // No-op. + } + + /** + * @param prjPred Serialized projection predicate. + * @param hnd Handler. + * @param bufSize Buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Automatic unsubscribe flag. + */ + StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, + int bufSize, long interval, boolean autoUnsubscribe) { + assert hnd != null; + assert bufSize > 0; + assert interval >= 0; + + this.prjPred = prjPred; + this.hnd = hnd; + this.bufSize = bufSize; + this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; + } + + /** + * @param marsh Marshaller. + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { + assert marsh != null; + + prjPredBytes = marsh.marshal(prjPred); + } + + /** + * @param marsh Marshaller. + * @param ldr Class loader. + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + assert marsh != null; + + assert prjPred == null; + assert prjPredBytes != null; + + prjPred = marsh.unmarshal(prjPredBytes, ldr); + } + + /** + * @return Projection predicate. + */ + public IgnitePredicate<ClusterNode> projectionPredicate() { + return prjPred; + } + + /** + * @param prjPred New projection predicate. + */ + public void projectionPredicate(IgnitePredicate<ClusterNode> prjPred) { + this.prjPred = prjPred; + } + + /** + * @return Serialized projection predicate. + */ + public byte[] projectionPredicateBytes() { + return prjPredBytes; + } + + /** + * @param prjPredBytes New serialized projection predicate. + */ + public void projectionPredicateBytes(byte[] prjPredBytes) { + this.prjPredBytes = prjPredBytes; + } + + /** + * @return Deployment class name. + */ + public String className() { + return clsName; + } + + /** + * @param clsName New deployment class name. + */ + public void className(String clsName) { + this.clsName = clsName; + } + + /** + * @return Deployment info. + */ + public GridDeploymentInfo deploymentInfo() { + return depInfo; + } + + /** + * @param depInfo New deployment info. + */ + public void deploymentInfo(GridDeploymentInfo depInfo) { + this.depInfo = depInfo; + } + + /** + * @return Handler. + */ + public GridContinuousHandler handler() { + return hnd; + } + + /** + * @param hnd New handler. + */ + public void handler(GridContinuousHandler hnd) { + this.hnd = hnd; + } + + /** + * @return Buffer size. + */ + public int bufferSize() { + return bufSize; + } + + /** + * @param bufSize New buffer size. + */ + public void bufferSize(int bufSize) { + this.bufSize = bufSize; + } + + /** + * @return Time interval. + */ + public long interval() { + return interval; + } + + /** + * @param interval New time interval. + */ + public void interval(long interval) { + this.interval = interval; + } + + /** + * @return Automatic unsubscribe flag. + */ + public boolean autoUnsubscribe() { + return autoUnsubscribe; + } + + /** + * @param autoUnsubscribe New automatic unsubscribe flag. + */ + public void autoUnsubscribe(boolean autoUnsubscribe) { + this.autoUnsubscribe = autoUnsubscribe; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + boolean b = prjPredBytes != null; + + out.writeBoolean(b); + + if (b) { + U.writeByteArray(out, prjPredBytes); + U.writeString(out, clsName); + out.writeObject(depInfo); + } + else + out.writeObject(prjPred); + + out.writeObject(hnd); + out.writeInt(bufSize); + out.writeLong(interval); + out.writeBoolean(autoUnsubscribe); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + boolean b = in.readBoolean(); + + if (b) { + prjPredBytes = U.readByteArray(in); + clsName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } + else + prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); + + hnd = (GridContinuousHandler)in.readObject(); + bufSize = in.readInt(); + interval = in.readLong(); + autoUnsubscribe = in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRequestData.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java new file mode 100644 index 0000000..5743dd4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Map<UUID, IgniteCheckedException> errs; + + /** + * @param routineId Routine id. + * @param errs Errs. + */ + public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) { + super(routineId); + + this.errs = new HashMap<>(errs); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** + * @return Errs. + */ + public Map<UUID, IgniteCheckedException> errs() { + return errs; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java new file mode 100644 index 0000000..cff6239 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * + */ +public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final StartRequestData startReqData; + + /** */ + private final Map<UUID, IgniteCheckedException> errs = new HashMap<>(); + + /** + * @param routineId Routine id. + * @param startReqData Start request data. + */ + public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) { + super(routineId); + + this.startReqData = startReqData; + } + + /** + * @return Start request data. + */ + public StartRequestData startRequestData() { + return startReqData; + } + + /** + * @param nodeId Node id. + * @param e Exception. + */ + public void addError(UUID nodeId, IgniteCheckedException e) { + errs.put(nodeId, e); + } + + /** + * @return Errs. + */ + public Map<UUID, IgniteCheckedException> errs() { + return errs; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return true; + } + + /** {@inheritDoc} */ + @Override public DiscoveryCustomMessage ackMessage() { + return new StartRoutineAckDiscoveryMessage(routineId, errs); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRoutineDiscoveryMessage.class, this, "routineId", routineId()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java new file mode 100644 index 0000000..256791a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param routineId Routine id. + */ + public StopRoutineAckDiscoveryMessage(UUID routineId) { + super(routineId); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java new file mode 100644 index 0000000..9dc2227 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param routineId Routine id. + */ + public StopRoutineDiscoveryMessage(UUID routineId) { + super(routineId); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return new StopRoutineAckDiscoveryMessage(routineId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StopRoutineDiscoveryMessage.class, this, "routineId", routineId()); + } +}