http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java deleted file mode 100644 index 4c91120..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java +++ /dev/null @@ -1,1846 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.thread.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.processors.timeout.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.internal.GridTopic.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; -import static org.gridgain.grid.kernal.processors.continuous.GridContinuousMessageType.*; - -/** - * Processor for continuous routines. - */ -public class GridContinuousProcessor extends GridProcessorAdapter { - /** Local infos. */ - private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new ConcurrentHashMap8<>(); - - /** Remote infos. */ - private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new ConcurrentHashMap8<>(); - - /** Start futures. */ - private final ConcurrentMap<UUID, StartFuture> startFuts = new ConcurrentHashMap8<>(); - - /** Start ack wait lists. */ - private final ConcurrentMap<UUID, Collection<UUID>> waitForStartAck = new ConcurrentHashMap8<>(); - - /** Stop futures. */ - private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap8<>(); - - /** Stop ack wait lists. */ - private final ConcurrentMap<UUID, Collection<UUID>> waitForStopAck = new ConcurrentHashMap8<>(); - - /** Threads started by this processor. */ - private final Collection<IgniteThread> threads = new GridConcurrentHashSet<>(); - - /** Pending start requests. */ - private final Map<UUID, Collection<GridContinuousMessage>> pending = new HashMap<>(); - - /** */ - private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>(); - - /** Stopped IDs. */ - private final Collection<UUID> stopped = new HashSet<>(); - - /** Lock for pending requests. */ - private final Lock pendingLock = new ReentrantLock(); - - /** Lock for stop process. */ - private final Lock stopLock = new ReentrantLock(); - - /** Delay in milliseconds between retries. */ - private long retryDelay = 1000; - - /** Number of retries using to send messages. */ - private int retryCnt = 3; - - /** Acknowledgement timeout. */ - private long ackTimeout; - - /** Marshaller. */ - private IgniteMarshaller marsh; - - /** - * @param ctx Kernal context. - */ - public GridContinuousProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - retryDelay = ctx.config().getNetworkSendRetryDelay(); - retryCnt = ctx.config().getNetworkSendRetryCount(); - ackTimeout = ctx.config().getNetworkTimeout(); - - if (ackTimeout < retryDelay * retryCnt) { - U.warn(log, "Acknowledgement timeout for continuous operations is less than message send " + - "retry delay multiplied by retries count (will increase timeout value) [ackTimeout=" + - ackTimeout + ", retryDelay=" + retryDelay + ", retryCnt=" + retryCnt + ']'); - - ackTimeout = retryDelay * retryCnt; - } - - marsh = ctx.config().getMarshaller(); - - ctx.event().addLocalEventListener(new GridLocalEventListener() { - @SuppressWarnings({"fallthrough", "TooBroadScope"}) - @Override public void onEvent(IgniteEvent evt) { - assert evt instanceof IgniteDiscoveryEvent; - - UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id(); - - Collection<GridContinuousMessage> reqs; - - pendingLock.lock(); - - try { - // Remove pending requests to send to joined node - // (if node is left or failed, they are dropped). - reqs = pending.remove(nodeId); - } - finally { - pendingLock.unlock(); - } - - switch (evt.type()) { - case EVT_NODE_JOINED: - if (reqs != null) { - UUID routineId = null; - - // Send pending requests. - try { - for (GridContinuousMessage req : reqs) { - routineId = req.routineId(); - - sendWithRetries(nodeId, req, null); - } - } - catch (ClusterTopologyException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send pending start request to node (is node alive?): " + - nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send pending start request to node: " + nodeId, e); - - completeStartFuture(routineId); - } - } - - break; - - case EVT_NODE_LEFT: - case EVT_NODE_FAILED: - // Do not wait for start acknowledgements from left node. - for (Map.Entry<UUID, Collection<UUID>> e : waitForStartAck.entrySet()) { - Collection<UUID> nodeIds = e.getValue(); - - for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) { - if (nodeId.equals(it.next())) { - it.remove(); - - break; - } - } - - if (nodeIds.isEmpty()) - completeStartFuture(e.getKey()); - } - - // Do not wait for stop acknowledgements from left node. - for (Map.Entry<UUID, Collection<UUID>> e : waitForStopAck.entrySet()) { - Collection<UUID> nodeIds = e.getValue(); - - for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) { - if (nodeId.equals(it.next())) { - it.remove(); - - break; - } - } - - if (nodeIds.isEmpty()) - completeStopFuture(e.getKey()); - } - - // Unregister handlers created by left node. - for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { - UUID routineId = e.getKey(); - RemoteRoutineInfo info = e.getValue(); - - if (info.autoUnsubscribe && nodeId.equals(info.nodeId)) - unregisterRemote(routineId); - } - - for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { - SyncMessageAckFuture fut = e.getValue(); - - if (fut.nodeId().equals(nodeId)) { - SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); - - if (fut0 != null) { - ClusterTopologyException err = new ClusterTopologyException( - "Node left grid while sending message to: " + nodeId); - - fut0.onDone(err); - } - } - } - - break; - - default: - assert false : "Unexpected event received: " + evt.shortDisplay(); - } - } - }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); - - ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object obj) { - GridContinuousMessage msg = (GridContinuousMessage)obj; - - if (msg.data() == null && msg.dataBytes() != null) { - try { - msg.data(marsh.unmarshal(msg.dataBytes(), null)); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process message (ignoring): " + msg, e); - - return; - } - } - - switch (msg.type()) { - case MSG_START_REQ: - processStartRequest(nodeId, msg); - - break; - - case MSG_START_ACK: - processStartAck(nodeId, msg); - - break; - - case MSG_STOP_REQ: - processStopRequest(nodeId, msg); - - break; - - case MSG_STOP_ACK: - processStopAck(nodeId, msg); - - break; - - case MSG_EVT_NOTIFICATION: - processNotification(nodeId, msg); - - break; - - case MSG_EVT_ACK: - processMessageAck(msg); - - break; - - default: - assert false : "Unexpected message received: " + msg.type(); - } - } - }); - - if (log.isDebugEnabled()) - log.debug("Continuous processor started."); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - ctx.io().removeMessageListener(TOPIC_CONTINUOUS); - - U.interrupt(threads); - U.joinThreads(threads, log); - - if (log.isDebugEnabled()) - log.debug("Continuous processor stopped."); - } - - /** {@inheritDoc} */ - @Override @Nullable public Object collectDiscoveryData(UUID nodeId) { - if (!nodeId.equals(ctx.localNodeId())) { - pendingLock.lock(); - - try { - // Create empty pending set. - pending.put(nodeId, new HashSet<GridContinuousMessage>()); - - DiscoveryData data = new DiscoveryData(ctx.localNodeId()); - - // Collect listeners information (will be sent to - // joining node during discovery process). - for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { - UUID routineId = e.getKey(); - LocalRoutineInfo info = e.getValue(); - - data.addItem(new DiscoveryDataItem(routineId, info.prjPred, - info.hnd, info.bufSize, info.interval)); - } - - return data; - } - finally { - pendingLock.unlock(); - } - } - else - return null; - } - - /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(Object obj) { - DiscoveryData data = (DiscoveryData)obj; - - if (!ctx.isDaemon() && data != null) { - for (DiscoveryDataItem item : data.items) { - // Register handler only if local node passes projection predicate. - if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) { - try { - if (ctx.config().isPeerClassLoadingEnabled()) - item.hnd.p2pUnmarshal(data.nodeId, ctx); - - if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, - item.autoUnsubscribe, false)) - item.hnd.onListenerRegistered(item.routineId, ctx); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to register continuous handler.", e); - } - } - } - } - } - - /** - * @param hnd Handler. - * @param bufSize Buffer size. - * @param interval Time interval. - * @param autoUnsubscribe Automatic unsubscribe flag. - * @param prjPred Projection predicate. - * @return Future. - */ - @SuppressWarnings("TooBroadScope") - public IgniteFuture<UUID> startRoutine(GridContinuousHandler hnd, - int bufSize, - long interval, - boolean autoUnsubscribe, - @Nullable IgnitePredicate<ClusterNode> prjPred) { - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - // Whether local node is included in routine. - boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode()); - - // Generate ID. - final UUID routineId = UUID.randomUUID(); - - StartRequestData reqData = new StartRequestData(prjPred, hnd, bufSize, interval, autoUnsubscribe); - - try { - if (ctx.config().isPeerClassLoadingEnabled()) { - // Handle peer deployment for projection predicate. - if (prjPred != null && !U.isGrid(prjPred.getClass())) { - Class cls = U.detectClass(prjPred); - - String clsName = cls.getName(); - - GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); - - if (dep == null) - throw new IgniteDeploymentException("Failed to deploy projection predicate: " + prjPred); - - reqData.clsName = clsName; - reqData.depInfo = new GridDeploymentInfoBean(dep); - - reqData.p2pMarshal(marsh); - } - - // Handle peer deployment for other handler-specific objects. - hnd.p2pMarshal(ctx); - } - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); - } - - // Register per-routine notifications listener if ordered messaging is used. - if (hnd.orderedTopic() != null) { - ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object obj) { - GridContinuousMessage msg = (GridContinuousMessage)obj; - - // Only notification can be ordered. - assert msg.type() == MSG_EVT_NOTIFICATION; - - if (msg.data() == null && msg.dataBytes() != null) { - try { - msg.data(marsh.unmarshal(msg.dataBytes(), null)); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process message (ignoring): " + msg, e); - - return; - } - } - - processNotification(nodeId, msg); - } - }); - } - - Collection<? extends ClusterNode> nodes; - Collection<UUID> nodeIds; - - pendingLock.lock(); - - try { - // Nodes that participate in routine (request will be sent to these nodes directly). - nodes = F.view(ctx.discovery().allNodes(), F.and(prjPred, F.remoteNodes(ctx.localNodeId()))); - - // Stop with exception if projection is empty. - if (nodes.isEmpty() && !locIncluded) { - return new GridFinishedFuture<>(ctx, - new ClusterTopologyException("Failed to register remote continuous listener (projection is empty).")); - } - - // IDs of nodes where request will be sent. - nodeIds = new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id())); - - // If there are currently joining nodes, add request to their pending lists. - // Node IDs set is updated to make sure that we wait for acknowledgement from - // these nodes. - for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : pending.entrySet()) { - if (nodeIds.add(e.getKey())) - e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData)); - } - - // Register routine locally. - locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval)); - } - finally { - pendingLock.unlock(); - } - - StartFuture fut = new StartFuture(ctx, routineId); - - if (!nodeIds.isEmpty()) { - // Wait for acknowledgements. - waitForStartAck.put(routineId, nodeIds); - - startFuts.put(routineId, fut); - - // Register acknowledge timeout (timeout object will be removed when - // future is completed). - fut.addTimeoutObject(new GridTimeoutObjectAdapter(ackTimeout) { - @Override public void onTimeout() { - // Stop waiting for acknowledgements. - Collection<UUID> ids = waitForStartAck.remove(routineId); - - if (ids != null) { - StartFuture f = startFuts.remove(routineId); - - assert f != null; - - // If there are still nodes without acknowledgements, - // Stop routine with exception. Continue and complete - // future otherwise. - if (!ids.isEmpty()) { - f.onDone(new IgniteCheckedException("Failed to get start acknowledgement from nodes (timeout " + - "expired): " + ids + ". Will unregister all continuous listeners.")); - - stopRoutine(routineId); - } - else - f.onRemoteRegistered(); - } - } - }); - } - - if (!nodes.isEmpty()) { - // Do not send projection predicate (nodes already filtered). - reqData.prjPred = null; - - // Send start requests. - try { - GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData); - - sendWithRetries(nodes, req, null); - } - catch (IgniteCheckedException e) { - startFuts.remove(routineId); - waitForStartAck.remove(routineId); - - fut.onDone(e); - - stopRoutine(routineId); - - locIncluded = false; - } - } - else { - // There are no remote nodes, but we didn't throw topology exception. - assert locIncluded; - - // Do not wait anything from remote nodes. - fut.onRemoteRegistered(); - } - - // Register local handler if needed. - if (locIncluded) { - try { - if (registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true)) - hnd.onListenerRegistered(routineId, ctx); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, - new IgniteCheckedException("Failed to register handler locally: " + hnd, e)); - } - } - - // Handler is registered locally. - fut.onLocalRegistered(); - - return fut; - } - - /** - * @param routineId Consume ID. - * @return Future. - */ - public IgniteFuture<?> stopRoutine(UUID routineId) { - assert routineId != null; - - boolean doStop = false; - - StopFuture fut = stopFuts.get(routineId); - - // Only one thread will stop routine with provided ID. - if (fut == null) { - StopFuture old = stopFuts.putIfAbsent(routineId, fut = new StopFuture(ctx)); - - if (old != null) - fut = old; - else - doStop = true; - } - - if (doStop) { - // Unregister routine locally. - LocalRoutineInfo routine = locInfos.remove(routineId); - - // Finish if routine is not found (wrong ID is provided). - if (routine == null) { - stopFuts.remove(routineId); - - fut.onDone(); - - return fut; - } - - // Unregister handler locally. - unregisterHandler(routineId, routine.hnd, true); - - pendingLock.lock(); - - try { - // Remove pending requests for this routine. - for (Collection<GridContinuousMessage> msgs : pending.values()) { - Iterator<GridContinuousMessage> it = msgs.iterator(); - - while (it.hasNext()) { - if (it.next().routineId().equals(routineId)) - it.remove(); - } - } - } - finally { - pendingLock.unlock(); - } - - // Nodes where to send stop requests. - Collection<? extends ClusterNode> nodes = F.view(ctx.discovery().allNodes(), - F.and(routine.prjPred, F.remoteNodes(ctx.localNodeId()))); - - if (!nodes.isEmpty()) { - // Wait for acknowledgements. - waitForStopAck.put(routineId, new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id()))); - - // Register acknowledge timeout (timeout object will be removed when - // future is completed). - fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId, - new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null))); - - // Send stop requests. - try { - for (ClusterNode node : nodes) { - try { - sendWithRetries(node.id(), - new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null), - null); - } - catch (ClusterTopologyException ignored) { - U.warn(log, "Failed to send stop request (node left topology): " + node.id()); - } - } - } - catch (IgniteCheckedException e) { - stopFuts.remove(routineId); - waitForStopAck.remove(routineId); - - fut.onDone(e); - } - } - else { - stopFuts.remove(routineId); - - fut.onDone(); - } - } - - return fut; - } - - /** - * @param nodeId ID of the node that started routine. - * @param routineId Routine ID. - * @param obj Notification object. - * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. - * @param sync If {@code true} then waits for event acknowledgment. - * @throws IgniteCheckedException In case of error. - */ - public void addNotification(UUID nodeId, - UUID routineId, - @Nullable Object obj, - @Nullable Object orderedTopic, - boolean sync) - throws IgniteCheckedException { - assert nodeId != null; - assert routineId != null; - - assert !nodeId.equals(ctx.localNodeId()); - - RemoteRoutineInfo info = rmtInfos.get(routineId); - - if (info != null) { - assert info.interval == 0 || !sync; - - if (sync) { - SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, nodeId); - - IgniteUuid futId = IgniteUuid.randomUuid(); - - syncMsgFuts.put(futId, fut); - - try { - sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic); - } - catch (IgniteCheckedException e) { - syncMsgFuts.remove(futId); - - throw e; - } - - fut.get(); - } - else { - Collection<Object> toSnd = info.add(obj); - - if (toSnd != null) - sendNotification(nodeId, routineId, null, toSnd, orderedTopic); - } - } - } - - /** - * @param nodeId Node ID. - * @param routineId Routine ID. - * @param futId Future ID. - * @param toSnd Notification object to send. - * @param orderedTopic Topic for ordered notifications. - * If {@code null}, non-ordered message will be sent. - * @throws IgniteCheckedException In case of error. - */ - private void sendNotification(UUID nodeId, - UUID routineId, - @Nullable IgniteUuid futId, - Collection<Object> toSnd, - @Nullable Object orderedTopic) throws IgniteCheckedException { - assert nodeId != null; - assert routineId != null; - assert toSnd != null; - assert !toSnd.isEmpty(); - - sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd), orderedTopic); - } - - /** - * @param nodeId Sender ID. - * @param req Start request. - */ - private void processStartRequest(UUID nodeId, GridContinuousMessage req) { - assert nodeId != null; - assert req != null; - - UUID routineId = req.routineId(); - StartRequestData data = req.data(); - - GridContinuousHandler hnd = data.hnd; - - IgniteCheckedException err = null; - - try { - if (ctx.config().isPeerClassLoadingEnabled()) { - String clsName = data.clsName; - - if (clsName != null) { - GridDeploymentInfo depInfo = data.depInfo; - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, - depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); - - if (dep == null) - throw new IgniteDeploymentException("Failed to obtain deployment for class: " + clsName); - - data.p2pUnmarshal(marsh, dep.classLoader()); - } - - hnd.p2pUnmarshal(nodeId, ctx); - } - } - catch (IgniteCheckedException e) { - err = e; - - U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e); - } - - boolean registered = false; - - if (err == null) { - try { - IgnitePredicate<ClusterNode> prjPred = data.prjPred; - - if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) { - registered = registerHandler(nodeId, routineId, hnd, data.bufSize, data.interval, - data.autoUnsubscribe, false); - } - } - catch (IgniteCheckedException e) { - err = e; - - U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e); - } - } - - try { - sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err), null); - } - catch (ClusterTopologyException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send start acknowledgement to node (is node alive?): " + nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send start acknowledgement to node: " + nodeId, e); - } - - if (registered) - hnd.onListenerRegistered(routineId, ctx); - } - - /** - * @param nodeId Sender ID. - * @param ack Start acknowledgement. - */ - private void processStartAck(UUID nodeId, GridContinuousMessage ack) { - assert nodeId != null; - assert ack != null; - - UUID routineId = ack.routineId(); - - final IgniteCheckedException err = ack.data(); - - if (err != null) { - if (waitForStartAck.remove(routineId) != null) { - final StartFuture fut = startFuts.remove(routineId); - - if (fut != null) { - fut.onDone(err); - - stopRoutine(routineId); - } - } - } - - Collection<UUID> nodeIds = waitForStartAck.get(routineId); - - if (nodeIds != null) { - nodeIds.remove(nodeId); - - if (nodeIds.isEmpty()) - completeStartFuture(routineId); - } - } - - /** - * @param nodeId Sender ID. - * @param req Stop request. - */ - private void processStopRequest(UUID nodeId, GridContinuousMessage req) { - assert nodeId != null; - assert req != null; - - UUID routineId = req.routineId(); - - unregisterRemote(routineId); - - try { - sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null), null); - } - catch (ClusterTopologyException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send stop acknowledgement to node (is node alive?): " + nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send stop acknowledgement to node: " + nodeId, e); - } - } - - /** - * @param nodeId Sender ID. - * @param ack Stop acknowledgement. - */ - private void processStopAck(UUID nodeId, GridContinuousMessage ack) { - assert nodeId != null; - assert ack != null; - - UUID routineId = ack.routineId(); - - Collection<UUID> nodeIds = waitForStopAck.get(routineId); - - if (nodeIds != null) { - nodeIds.remove(nodeId); - - if (nodeIds.isEmpty()) - completeStopFuture(routineId); - } - } - - /** - * @param msg Message. - */ - private void processMessageAck(GridContinuousMessage msg) { - assert msg.futureId() != null; - - SyncMessageAckFuture fut = syncMsgFuts.remove(msg.futureId()); - - if (fut != null) - fut.onDone(); - } - - /** - * @param nodeId Sender ID. - * @param msg Message. - */ - private void processNotification(UUID nodeId, GridContinuousMessage msg) { - assert nodeId != null; - assert msg != null; - - UUID routineId = msg.routineId(); - - try { - LocalRoutineInfo routine = locInfos.get(routineId); - - if (routine != null) - routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>)msg.data(), ctx); - } - finally { - if (msg.futureId() != null) { - try { - sendWithRetries(nodeId, - new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null), - null); - } - catch (IgniteCheckedException e) { - log.error("Failed to send event acknowledgment to node: " + nodeId, e); - } - } - } - } - - /** - * @param routineId Consume ID. - */ - private void completeStartFuture(UUID routineId) { - assert routineId != null; - - if (waitForStartAck.remove(routineId) != null) { - StartFuture fut = startFuts.remove(routineId); - - assert fut != null; - - fut.onRemoteRegistered(); - } - } - - /** - * @param routineId Consume ID. - */ - private void completeStopFuture(UUID routineId) { - assert routineId != null; - - if (waitForStopAck.remove(routineId) != null) { - GridFutureAdapter <?> fut = stopFuts.remove(routineId); - - assert fut != null; - - fut.onDone(); - } - } - - /** - * @param nodeId Node ID. - * @param routineId Consume ID. - * @param hnd Handler. - * @param bufSize Buffer size. - * @param interval Time interval. - * @param autoUnsubscribe Automatic unsubscribe flag. - * @param loc Local registration flag. - * @return Whether listener was actually registered. - * @throws IgniteCheckedException In case of error. - */ - private boolean registerHandler(final UUID nodeId, - final UUID routineId, - final GridContinuousHandler hnd, - int bufSize, - final long interval, - boolean autoUnsubscribe, - boolean loc) throws IgniteCheckedException { - assert nodeId != null; - assert routineId != null; - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - final RemoteRoutineInfo info = new RemoteRoutineInfo(nodeId, hnd, bufSize, interval, autoUnsubscribe); - - boolean doRegister = loc; - - if (!doRegister) { - stopLock.lock(); - - try { - doRegister = !stopped.remove(routineId) && rmtInfos.putIfAbsent(routineId, info) == null; - } - finally { - stopLock.unlock(); - } - } - - if (doRegister) { - if (interval > 0) { - IgniteThread checker = new IgniteThread(new GridWorker(ctx.gridName(), "continuous-buffer-checker", log) { - @SuppressWarnings("ConstantConditions") - @Override protected void body() { - long interval0 = interval; - - while (!isCancelled()) { - try { - U.sleep(interval0); - } - catch (IgniteInterruptedException ignored) { - break; - } - - IgniteBiTuple<Collection<Object>, Long> t = info.checkInterval(); - - Collection<Object> toSnd = t.get1(); - - if (toSnd != null) { - try { - sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic()); - } - catch (ClusterTopologyException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send notification to node (is node alive?): " + nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send notification to node: " + nodeId, e); - } - } - - interval0 = t.get2(); - } - } - }); - - threads.add(checker); - - checker.start(); - } - - return hnd.register(nodeId, routineId, ctx); - } - - return false; - } - - /** - * @param routineId Routine ID. - * @param hnd Handler - * @param loc If Handler unregistered on master node. - */ - private void unregisterHandler(UUID routineId, GridContinuousHandler hnd, boolean loc) { - assert routineId != null; - assert hnd != null; - - if (loc && hnd.orderedTopic() != null) - ctx.io().removeMessageListener(hnd.orderedTopic()); - - hnd.unregister(routineId, ctx); - } - - /** - * @param routineId Routine ID. - */ - @SuppressWarnings("TooBroadScope") - private void unregisterRemote(UUID routineId) { - RemoteRoutineInfo info; - - stopLock.lock(); - - try { - info = rmtInfos.remove(routineId); - - if (info == null) - stopped.add(routineId); - } - finally { - stopLock.unlock(); - } - - if (info != null) - unregisterHandler(routineId, info.hnd, false); - } - - /** - * @param nodeId Destination node ID. - * @param msg Message. - * @param orderedTopic Topic for ordered notifications. - * If {@code null}, non-ordered message will be sent. - * @throws IgniteCheckedException In case of error. - */ - private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic) - throws IgniteCheckedException { - assert nodeId != null; - assert msg != null; - - ClusterNode node = ctx.discovery().node(nodeId); - - if (node != null) - sendWithRetries(node, msg, orderedTopic); - else - throw new ClusterTopologyException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId); - } - - /** - * @param node Destination node. - * @param msg Message. - * @param orderedTopic Topic for ordered notifications. - * If {@code null}, non-ordered message will be sent. - * @throws IgniteCheckedException In case of error. - */ - private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic) - throws IgniteCheckedException { - assert node != null; - assert msg != null; - - sendWithRetries(F.asList(node), msg, orderedTopic); - } - - /** - * @param nodes Destination nodes. - * @param msg Message. - * @param orderedTopic Topic for ordered notifications. - * If {@code null}, non-ordered message will be sent. - * @throws IgniteCheckedException In case of error. - */ - private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg, - @Nullable Object orderedTopic) throws IgniteCheckedException { - assert !F.isEmpty(nodes); - assert msg != null; - - if (msg.data() != null && (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id()))) - msg.dataBytes(marsh.marshal(msg.data())); - - boolean first = true; - - for (ClusterNode node : nodes) { - msg = first ? msg : (GridContinuousMessage)msg.clone(); - - first = false; - - int cnt = 0; - - while (cnt <= retryCnt) { - try { - cnt++; - - if (orderedTopic != null) { - ctx.io().sendOrderedMessage( - node, - orderedTopic, - ctx.io().nextMessageId(orderedTopic, node.id()), - msg, - SYSTEM_POOL, - 0, - true); - } - else - ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL); - - break; - } - catch (IgniteInterruptedException e) { - throw e; - } - catch (IgniteCheckedException e) { - if (!ctx.discovery().alive(node.id())) - throw new ClusterTopologyException("Node left grid while sending message to: " + node.id(), e); - - if (cnt == retryCnt) - throw e; - else if (log.isDebugEnabled()) - log.debug("Failed to send message to node (will retry): " + node.id()); - } - - U.sleep(retryDelay); - } - } - } - - /** - * Local routine info. - */ - @SuppressWarnings("PackageVisibleInnerClass") - static class LocalRoutineInfo { - /** Projection predicate. */ - private final IgnitePredicate<ClusterNode> prjPred; - - /** Continuous routine handler. */ - private final GridContinuousHandler hnd; - - /** Buffer size. */ - private final int bufSize; - - /** Time interval. */ - private final long interval; - - /** - * @param prjPred Projection predicate. - * @param hnd Continuous routine handler. - * @param bufSize Buffer size. - * @param interval Interval. - */ - LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, int bufSize, - long interval) { - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - this.prjPred = prjPred; - this.hnd = hnd; - this.bufSize = bufSize; - this.interval = interval; - } - - /** - * @return Handler. - */ - GridContinuousHandler handler() { - return hnd; - } - } - - /** - * Remote routine info. - */ - private static class RemoteRoutineInfo { - /** Master node ID. */ - private final UUID nodeId; - - /** Continuous routine handler. */ - private final GridContinuousHandler hnd; - - /** Buffer size. */ - private final int bufSize; - - /** Time interval. */ - private final long interval; - - /** Lock. */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - /** Buffer. */ - private ConcurrentLinkedDeque8<Object> buf; - - /** Last send time. */ - private long lastSndTime = U.currentTimeMillis(); - - /** Automatic unsubscribe flag. */ - private boolean autoUnsubscribe; - - /** - * @param nodeId Master node ID. - * @param hnd Continuous routine handler. - * @param bufSize Buffer size. - * @param interval Interval. - * @param autoUnsubscribe Automatic unsubscribe flag. - */ - RemoteRoutineInfo(UUID nodeId, GridContinuousHandler hnd, int bufSize, long interval, - boolean autoUnsubscribe) { - assert nodeId != null; - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - this.nodeId = nodeId; - this.hnd = hnd; - this.bufSize = bufSize; - this.interval = interval; - this.autoUnsubscribe = autoUnsubscribe; - - buf = new ConcurrentLinkedDeque8<>(); - } - - /** - * @param obj Object to add. - * @return Object to send or {@code null} if there is nothing to send for now. - */ - @Nullable Collection<Object> add(@Nullable Object obj) { - Collection<Object> toSnd = null; - - if (buf.sizex() >= bufSize - 1) { - lock.writeLock().lock(); - - try { - buf.add(obj); - - toSnd = buf; - - buf = new ConcurrentLinkedDeque8<>(); - - if (interval > 0) - lastSndTime = U.currentTimeMillis(); - } - finally { - lock.writeLock().unlock(); - } - } - else { - lock.readLock().lock(); - - try { - buf.add(obj); - } - finally { - lock.readLock().unlock(); - } - } - - return toSnd != null ? new ArrayList<>(toSnd) : null; - } - - /** - * @return Tuple with objects to sleep (or {@code null} if there is nothing to - * send for now) and time interval after next check is needed. - */ - @SuppressWarnings("TooBroadScope") - IgniteBiTuple<Collection<Object>, Long> checkInterval() { - assert interval > 0; - - Collection<Object> toSnd = null; - long diff; - - long now = U.currentTimeMillis(); - - lock.writeLock().lock(); - - try { - diff = now - lastSndTime; - - if (diff >= interval && !buf.isEmpty()) { - toSnd = buf; - - buf = new ConcurrentLinkedDeque8<>(); - - lastSndTime = now; - } - } - finally { - lock.writeLock().unlock(); - } - - return F.t(toSnd, diff < interval ? interval - diff : interval); - } - } - - /** - * Start request data. - */ - private static class StartRequestData implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Projection predicate. */ - private IgnitePredicate<ClusterNode> prjPred; - - /** Serialized projection predicate. */ - private byte[] prjPredBytes; - - /** Deployment class name. */ - private String clsName; - - /** Deployment info. */ - private GridDeploymentInfo depInfo; - - /** Handler. */ - private GridContinuousHandler hnd; - - /** Buffer size. */ - private int bufSize; - - /** Time interval. */ - private long interval; - - /** Automatic unsubscribe flag. */ - private boolean autoUnsubscribe; - - /** - * Required by {@link Externalizable}. - */ - public StartRequestData() { - // No-op. - } - - /** - * @param prjPred Serialized projection predicate. - * @param hnd Handler. - * @param bufSize Buffer size. - * @param interval Time interval. - * @param autoUnsubscribe Automatic unsubscribe flag. - */ - StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, - int bufSize, long interval, boolean autoUnsubscribe) { - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - this.prjPred = prjPred; - this.hnd = hnd; - this.bufSize = bufSize; - this.interval = interval; - this.autoUnsubscribe = autoUnsubscribe; - } - - /** - * @param marsh Marshaller. - * @throws IgniteCheckedException In case of error. - */ - void p2pMarshal(IgniteMarshaller marsh) throws IgniteCheckedException { - assert marsh != null; - - prjPredBytes = marsh.marshal(prjPred); - } - - /** - * @param marsh Marshaller. - * @param ldr Class loader. - * @throws IgniteCheckedException In case of error. - */ - void p2pUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; - - assert prjPred == null; - assert prjPredBytes != null; - - prjPred = marsh.unmarshal(prjPredBytes, ldr); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - boolean b = prjPredBytes != null; - - out.writeBoolean(b); - - if (b) { - U.writeByteArray(out, prjPredBytes); - U.writeString(out, clsName); - out.writeObject(depInfo); - } - else - out.writeObject(prjPred); - - out.writeObject(hnd); - out.writeInt(bufSize); - out.writeLong(interval); - out.writeBoolean(autoUnsubscribe); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - boolean b = in.readBoolean(); - - if (b) { - prjPredBytes = U.readByteArray(in); - clsName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - else - prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); - - hnd = (GridContinuousHandler)in.readObject(); - bufSize = in.readInt(); - interval = in.readLong(); - autoUnsubscribe = in.readBoolean(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StartRequestData.class, this); - } - } - - /** - * Discovery data. - */ - private static class DiscoveryData implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Node ID. */ - private UUID nodeId; - - /** Items. */ - @GridToStringInclude - private Collection<DiscoveryDataItem> items; - - /** - * Required by {@link Externalizable}. - */ - public DiscoveryData() { - // No-op. - } - - /** - * @param nodeId Node ID. - */ - DiscoveryData(UUID nodeId) { - assert nodeId != null; - - this.nodeId = nodeId; - - items = new ArrayList<>(); - } - - /** - * @param item Item. - */ - public void addItem(DiscoveryDataItem item) { - items.add(item); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, nodeId); - U.writeCollection(out, items); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - nodeId = U.readUuid(in); - items = U.readCollection(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DiscoveryData.class, this); - } - } - - /** - * Discovery data item. - */ - private static class DiscoveryDataItem implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Consume ID. */ - private UUID routineId; - - /** Projection predicate. */ - private IgnitePredicate<ClusterNode> prjPred; - - /** Handler. */ - private GridContinuousHandler hnd; - - /** Buffer size. */ - private int bufSize; - - /** Time interval. */ - private long interval; - - /** Automatic unsubscribe flag. */ - private boolean autoUnsubscribe; - - /** - * Required by {@link Externalizable}. - */ - public DiscoveryDataItem() { - // No-op. - } - - /** - * @param routineId Consume ID. - * @param prjPred Projection predicate. - * @param hnd Handler. - * @param bufSize Buffer size. - * @param interval Time interval. - */ - DiscoveryDataItem(UUID routineId, @Nullable IgnitePredicate<ClusterNode> prjPred, - GridContinuousHandler hnd, int bufSize, long interval) { - assert routineId != null; - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - this.routineId = routineId; - this.prjPred = prjPred; - this.hnd = hnd; - this.bufSize = bufSize; - this.interval = interval; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, routineId); - out.writeObject(prjPred); - out.writeObject(hnd); - out.writeInt(bufSize); - out.writeLong(interval); - out.writeBoolean(autoUnsubscribe); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - routineId = U.readUuid(in); - prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); - hnd = (GridContinuousHandler)in.readObject(); - bufSize = in.readInt(); - interval = in.readLong(); - autoUnsubscribe = in.readBoolean(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DiscoveryDataItem.class, this); - } - } - - /** - * Future for start routine. - */ - private static class StartFuture extends GridFutureAdapter<UUID> { - /** */ - private static final long serialVersionUID = 0L; - - /** Consume ID. */ - private UUID routineId; - - /** Local listener is registered. */ - private volatile boolean loc; - - /** All remote listeners are registered. */ - private volatile boolean rmt; - - /** Timeout object. */ - private volatile GridTimeoutObject timeoutObj; - - /** - * Required by {@link Externalizable}. - */ - public StartFuture() { - // No-op. - } - - /** - * @param ctx Kernal context. - * @param routineId Consume ID. - */ - StartFuture(GridKernalContext ctx, UUID routineId) { - super(ctx); - - this.routineId = routineId; - } - - /** - * Called when local listener is registered. - */ - public void onLocalRegistered() { - loc = true; - - if (rmt && !isDone()) - onDone(routineId); - } - - /** - * Called when all remote listeners are registered. - */ - public void onRemoteRegistered() { - rmt = true; - - if (loc && !isDone()) - onDone(routineId); - } - - /** - * @param timeoutObj Timeout object. - */ - public void addTimeoutObject(GridTimeoutObject timeoutObj) { - assert timeoutObj != null; - - this.timeoutObj = timeoutObj; - - ctx.timeout().addTimeoutObject(timeoutObj); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable UUID res, @Nullable Throwable err) { - if (timeoutObj != null) - ctx.timeout().removeTimeoutObject(timeoutObj); - - return super.onDone(res, err); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StartFuture.class, this); - } - } - - /** - * Future for stop routine. - */ - private static class StopFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 0L; - - /** Timeout object. */ - private volatile GridTimeoutObject timeoutObj; - - /** - * Required by {@link Externalizable}. - */ - public StopFuture() { - // No-op. - } - - /** - * @param ctx Kernal context. - */ - StopFuture(GridKernalContext ctx) { - super(ctx); - } - - /** - * @param timeoutObj Timeout object. - */ - public void addTimeoutObject(GridTimeoutObject timeoutObj) { - assert timeoutObj != null; - - this.timeoutObj = timeoutObj; - - ctx.timeout().addTimeoutObject(timeoutObj); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { - if (timeoutObj != null) - ctx.timeout().removeTimeoutObject(timeoutObj); - - return super.onDone(res, err); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StopFuture.class, this); - } - } - - /** - * Synchronous message acknowledgement future. - */ - private static class SyncMessageAckFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private UUID nodeId; - - /** - * Required by {@link Externalizable}. - */ - public SyncMessageAckFuture() { - // No-op. - } - - /** - * @param ctx Kernal context. - * @param nodeId Master node ID. - */ - SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) { - super(ctx); - - this.nodeId = nodeId; - } - - /** - * @return Master node ID. - */ - UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SyncMessageAckFuture.class, this); - } - } - - /** - * Timeout object for stop process. - */ - private class StopTimeoutObject extends GridTimeoutObjectAdapter { - /** Timeout. */ - private final long timeout; - - /** Routine ID. */ - private final UUID routineId; - - /** Request. */ - private final GridContinuousMessage req; - - /** - * @param timeout Timeout. - * @param routineId Routine ID. - * @param req Request. - */ - protected StopTimeoutObject(long timeout, UUID routineId, GridContinuousMessage req) { - super(timeout); - - assert routineId != null; - assert req != null; - - this.timeout = timeout; - this.routineId = routineId; - this.req = req; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - Collection<UUID> ids = waitForStopAck.remove(routineId); - - if (ids != null) { - U.warn(log, "Failed to get stop acknowledgement from nodes (timeout expired): " + ids + - ". Will retry."); - - StopFuture f = stopFuts.get(routineId); - - if (f != null) { - if (!ids.isEmpty()) { - waitForStopAck.put(routineId, ids); - - // Resend requests. - for (UUID id : ids) { - try { - sendWithRetries(id, req, null); - } - catch (ClusterTopologyException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to resend stop request to node (is node alive?): " + id); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to resend stop request to node: " + id, e); - - ids.remove(id); - - if (ids.isEmpty()) - f.onDone(e); - } - } - - // Reschedule timeout. - ctx.timeout().addTimeoutObject(new StopTimeoutObject(timeout, routineId, req)); - } - else if (stopFuts.remove(routineId) != null) - f.onDone(); - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java deleted file mode 100644 index c5b9f1b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.dataload.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Bundled factory for cache updaters. - */ -public class GridDataLoadCacheUpdaters { - /** */ - private static final IgniteDataLoadCacheUpdater INDIVIDUAL = new Individual(); - - /** */ - private static final IgniteDataLoadCacheUpdater BATCHED = new Batched(); - - /** */ - private static final IgniteDataLoadCacheUpdater BATCHED_SORTED = new BatchedSorted(); - - /** */ - private static final IgniteDataLoadCacheUpdater GROUP_LOCKED = new GroupLocked(); - - /** - * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance - * is not the best. - * - * @return Single updater. - */ - public static <K, V> IgniteDataLoadCacheUpdater<K, V> individual() { - return INDIVIDUAL; - } - - /** - * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting - * updated concurrently. Performance is generally better than in {@link #individual()}. - * - * @return Batched updater. - */ - public static <K, V> IgniteDataLoadCacheUpdater<K, V> batched() { - return BATCHED; - } - - /** - * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates - * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}. - * - * @return Batched sorted updater. - */ - public static <K extends Comparable<?>, V> IgniteDataLoadCacheUpdater<K, V> batchedSorted() { - return BATCHED_SORTED; - } - - /** - * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])} in group lock transaction. Requires that there are no - * concurrent updates other than in group lock. - * - * @return Updater with group lock. - */ - public static <K, V> IgniteDataLoadCacheUpdater<K, V> groupLocked() { - return GROUP_LOCKED; - } - - /** - * Updates cache. - * - * @param cache Cache. - * @param rmvCol Keys to remove. - * @param putMap Entries to put. - * @throws IgniteCheckedException If failed. - */ - protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Collection<K> rmvCol, - Map<K, V> putMap) throws IgniteCheckedException { - assert rmvCol != null || putMap != null; - - // Here we assume that there are no key duplicates, so the following calls are valid. - if (rmvCol != null) - ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol); - - if (putMap != null) - cache.putAll(putMap); - } - - /** - * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone. - */ - private static class Individual<K, V> implements IgniteDataLoadCacheUpdater<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) - throws IgniteCheckedException { - assert cache != null; - assert !F.isEmpty(entries); - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key != null; - - V val = entry.getValue(); - - if (val == null) - cache.remove(key); - else - cache.put(key, val); - } - } - } - - /** - * Batched updater. Updates cache using batch operations thus is dead lock prone. - */ - private static class Batched<K, V> implements IgniteDataLoadCacheUpdater<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) - throws IgniteCheckedException { - assert cache != null; - assert !F.isEmpty(entries); - - Map<K, V> putAll = null; - Collection<K> rmvAll = null; - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key != null; - - V val = entry.getValue(); - - if (val == null) { - if (rmvAll == null) - rmvAll = new ArrayList<>(); - - rmvAll.add(key); - } - else { - if (putAll == null) - putAll = new HashMap<>(); - - putAll.put(key, val); - } - } - - updateAll(cache, rmvAll, putAll); - } - } - - /** - * Batched updater. Updates cache using batch operations thus is dead lock prone. - */ - private static class BatchedSorted<K, V> implements IgniteDataLoadCacheUpdater<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) - throws IgniteCheckedException { - assert cache != null; - assert !F.isEmpty(entries); - - Map<K, V> putAll = null; - Collection<K> rmvAll = null; - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key instanceof Comparable; - - V val = entry.getValue(); - - if (val == null) { - if (rmvAll == null) - rmvAll = new TreeSet<>(); - - rmvAll.add(key); - } - else { - if (putAll == null) - putAll = new TreeMap<>(); - - putAll.put(key, val); - } - } - - updateAll(cache, rmvAll, putAll); - } - } - - /** - * Cache updater which uses group lock. - */ - private static class GroupLocked<K, V> implements IgniteDataLoadCacheUpdater<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) - throws IgniteCheckedException { - assert cache != null; - assert !F.isEmpty(entries); - - assert cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != ATOMIC; - - Map<Integer, Integer> partsCounts = new HashMap<>(); - - // Group by partition ID. - Map<Integer, Collection<K>> rmvPartMap = null; - Map<Integer, Map<K, V>> putPartMap = null; - - Ignite ignite = cache.unwrap(Ignite.class); - - GridCacheAffinity<K> aff = ignite.<K, V>cache(cache.getName()).affinity(); - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key != null; - - V val = entry.getValue(); - - int part = aff.partition(key); - - Integer cnt = partsCounts.get(part); - - partsCounts.put(part, cnt == null ? 1 : cnt + 1); - - if (val == null) { - if (rmvPartMap == null) - rmvPartMap = new HashMap<>(); - - F.addIfAbsent(rmvPartMap, part, F.<K>newList()).add(key); - } - else { - if (putPartMap == null) - putPartMap = new HashMap<>(); - - F.addIfAbsent(putPartMap, part, F.<K, V>newMap()).put(key, val); - } - } - - IgniteTransactions txs = ignite.transactions(); - - for (Map.Entry<Integer, Integer> e : partsCounts.entrySet()) { - Integer part = e.getKey(); - int cnt = e.getValue(); - - try (IgniteTx tx = txs.txStartPartition(cache.getName(), part, PESSIMISTIC, REPEATABLE_READ, 0, cnt)) { - updateAll(cache, rmvPartMap == null ? null : rmvPartMap.get(part), - putPartMap == null ? null : putPartMap.get(part)); - - tx.commit(); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadRequest.java deleted file mode 100644 index c7205a0..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadRequest.java +++ /dev/null @@ -1,548 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.nio.*; -import java.util.*; - -/** - * - */ -public class GridDataLoadRequest extends GridTcpCommunicationMessageAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long reqId; - - /** */ - private byte[] resTopicBytes; - - /** Cache name. */ - private String cacheName; - - /** */ - private byte[] updaterBytes; - - /** Entries to put. */ - private byte[] colBytes; - - /** {@code True} to ignore deployment ownership. */ - private boolean ignoreDepOwnership; - - /** */ - private boolean skipStore; - - /** */ - private IgniteDeploymentMode depMode; - - /** */ - private String sampleClsName; - - /** */ - private String userVer; - - /** Node class loader participants. */ - @GridToStringInclude - @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) - private Map<UUID, IgniteUuid> ldrParticipants; - - /** */ - private IgniteUuid clsLdrId; - - /** */ - private boolean forceLocDep; - - /** - * {@code Externalizable} support. - */ - public GridDataLoadRequest() { - // No-op. - } - - /** - * @param reqId Request ID. - * @param resTopicBytes Response topic. - * @param cacheName Cache name. - * @param updaterBytes Cache updater. - * @param colBytes Collection bytes. - * @param ignoreDepOwnership Ignore ownership. - * @param skipStore Skip store flag. - * @param depMode Deployment mode. - * @param sampleClsName Sample class name. - * @param userVer User version. - * @param ldrParticipants Loader participants. - * @param clsLdrId Class loader ID. - * @param forceLocDep Force local deployment. - */ - public GridDataLoadRequest(long reqId, - byte[] resTopicBytes, - @Nullable String cacheName, - byte[] updaterBytes, - byte[] colBytes, - boolean ignoreDepOwnership, - boolean skipStore, - IgniteDeploymentMode depMode, - String sampleClsName, - String userVer, - Map<UUID, IgniteUuid> ldrParticipants, - IgniteUuid clsLdrId, - boolean forceLocDep) { - this.reqId = reqId; - this.resTopicBytes = resTopicBytes; - this.cacheName = cacheName; - this.updaterBytes = updaterBytes; - this.colBytes = colBytes; - this.ignoreDepOwnership = ignoreDepOwnership; - this.skipStore = skipStore; - this.depMode = depMode; - this.sampleClsName = sampleClsName; - this.userVer = userVer; - this.ldrParticipants = ldrParticipants; - this.clsLdrId = clsLdrId; - this.forceLocDep = forceLocDep; - } - - /** - * @return Request ID. - */ - public long requestId() { - return reqId; - } - - /** - * @return Response topic. - */ - public byte[] responseTopicBytes() { - return resTopicBytes; - } - - /** - * @return Cache name. - */ - public String cacheName() { - return cacheName; - } - - /** - * @return Updater. - */ - public byte[] updaterBytes() { - return updaterBytes; - } - - /** - * @return Collection bytes. - */ - public byte[] collectionBytes() { - return colBytes; - } - - /** - * @return {@code True} to ignore ownership. - */ - public boolean ignoreDeploymentOwnership() { - return ignoreDepOwnership; - } - - /** - * @return Skip store flag. - */ - public boolean skipStore() { - return skipStore; - } - - /** - * @return Deployment mode. - */ - public IgniteDeploymentMode deploymentMode() { - return depMode; - } - - /** - * @return Sample class name. - */ - public String sampleClassName() { - return sampleClsName; - } - - /** - * @return User version. - */ - public String userVersion() { - return userVer; - } - - /** - * @return Participants. - */ - public Map<UUID, IgniteUuid> participants() { - return ldrParticipants; - } - - /** - * @return Class loader ID. - */ - public IgniteUuid classLoaderId() { - return clsLdrId; - } - - /** - * @return {@code True} to force local deployment. - */ - public boolean forceLocalDeployment() { - return forceLocDep; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDataLoadRequest.class, this); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 0: - if (!commState.putString(cacheName)) - return false; - - commState.idx++; - - case 1: - if (!commState.putGridUuid(clsLdrId)) - return false; - - commState.idx++; - - case 2: - if (!commState.putByteArray(colBytes)) - return false; - - commState.idx++; - - case 3: - if (!commState.putEnum(depMode)) - return false; - - commState.idx++; - - case 4: - if (!commState.putBoolean(forceLocDep)) - return false; - - commState.idx++; - - case 5: - if (!commState.putBoolean(ignoreDepOwnership)) - return false; - - commState.idx++; - - case 6: - if (ldrParticipants != null) { - if (commState.it == null) { - if (!commState.putInt(ldrParticipants.size())) - return false; - - commState.it = ldrParticipants.entrySet().iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - Map.Entry<UUID, IgniteUuid> e = (Map.Entry<UUID, IgniteUuid>)commState.cur; - - if (!commState.keyDone) { - if (!commState.putUuid(e.getKey())) - return false; - - commState.keyDone = true; - } - - if (!commState.putGridUuid(e.getValue())) - return false; - - commState.keyDone = false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 7: - if (!commState.putLong(reqId)) - return false; - - commState.idx++; - - case 8: - if (!commState.putByteArray(resTopicBytes)) - return false; - - commState.idx++; - - case 9: - if (!commState.putString(sampleClsName)) - return false; - - commState.idx++; - - case 10: - if (!commState.putBoolean(skipStore)) - return false; - - commState.idx++; - - case 11: - if (!commState.putByteArray(updaterBytes)) - return false; - - commState.idx++; - - case 12: - if (!commState.putString(userVer)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - switch (commState.idx) { - case 0: - String cacheName0 = commState.getString(); - - if (cacheName0 == STR_NOT_READ) - return false; - - cacheName = cacheName0; - - commState.idx++; - - case 1: - IgniteUuid clsLdrId0 = commState.getGridUuid(); - - if (clsLdrId0 == GRID_UUID_NOT_READ) - return false; - - clsLdrId = clsLdrId0; - - commState.idx++; - - case 2: - byte[] colBytes0 = commState.getByteArray(); - - if (colBytes0 == BYTE_ARR_NOT_READ) - return false; - - colBytes = colBytes0; - - commState.idx++; - - case 3: - if (buf.remaining() < 1) - return false; - - byte depMode0 = commState.getByte(); - - depMode = IgniteDeploymentMode.fromOrdinal(depMode0); - - commState.idx++; - - case 4: - if (buf.remaining() < 1) - return false; - - forceLocDep = commState.getBoolean(); - - commState.idx++; - - case 5: - if (buf.remaining() < 1) - return false; - - ignoreDepOwnership = commState.getBoolean(); - - commState.idx++; - - case 6: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (ldrParticipants == null) - ldrParticipants = new HashMap<>(commState.readSize, 1.0f); - - for (int i = commState.readItems; i < commState.readSize; i++) { - if (!commState.keyDone) { - UUID _val = commState.getUuid(); - - if (_val == UUID_NOT_READ) - return false; - - commState.cur = _val; - commState.keyDone = true; - } - - IgniteUuid _val = commState.getGridUuid(); - - if (_val == GRID_UUID_NOT_READ) - return false; - - ldrParticipants.put((UUID)commState.cur, _val); - - commState.keyDone = false; - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - commState.cur = null; - - commState.idx++; - - case 7: - if (buf.remaining() < 8) - return false; - - reqId = commState.getLong(); - - commState.idx++; - - case 8: - byte[] resTopicBytes0 = commState.getByteArray(); - - if (resTopicBytes0 == BYTE_ARR_NOT_READ) - return false; - - resTopicBytes = resTopicBytes0; - - commState.idx++; - - case 9: - String sampleClsName0 = commState.getString(); - - if (sampleClsName0 == STR_NOT_READ) - return false; - - sampleClsName = sampleClsName0; - - commState.idx++; - - case 10: - if (buf.remaining() < 1) - return false; - - skipStore = commState.getBoolean(); - - commState.idx++; - - case 11: - byte[] updaterBytes0 = commState.getByteArray(); - - if (updaterBytes0 == BYTE_ARR_NOT_READ) - return false; - - updaterBytes = updaterBytes0; - - commState.idx++; - - case 12: - String userVer0 = commState.getString(); - - if (userVer0 == STR_NOT_READ) - return false; - - userVer = userVer0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 61; - } - - /** {@inheritDoc} */ - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDataLoadRequest _clone = new GridDataLoadRequest(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - GridDataLoadRequest _clone = (GridDataLoadRequest)_msg; - - _clone.reqId = reqId; - _clone.resTopicBytes = resTopicBytes; - _clone.cacheName = cacheName; - _clone.updaterBytes = updaterBytes; - _clone.colBytes = colBytes; - _clone.ignoreDepOwnership = ignoreDepOwnership; - _clone.skipStore = skipStore; - _clone.depMode = depMode; - _clone.sampleClsName = sampleClsName; - _clone.userVer = userVer; - _clone.ldrParticipants = ldrParticipants; - _clone.clsLdrId = clsLdrId; - _clone.forceLocDep = forceLocDep; - } -}
