http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index 0000000,253ac18..e7067f2 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@@ -1,0 -1,361 +1,348 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.managers.communication; + + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.tostring.*; + + import java.io.*; + import java.nio.*; + + /** + * Wrapper for all grid messages. + */ + public class GridIoMessage extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Policy. */ + private GridIoPolicy plc; + + /** Message topic. */ + @GridToStringInclude + @GridDirectTransient + private Object topic; + + /** Topic bytes. */ + private byte[] topicBytes; + + /** Topic ordinal. */ + private int topicOrd = -1; + - /** Message order. */ - private long msgId = -1; ++ /** Message ordered flag. */ ++ private boolean ordered; + + /** Message timeout. */ + private long timeout; + + /** Whether message can be skipped on timeout. */ + private boolean skipOnTimeout; + + /** Message. */ + private GridTcpCommunicationMessageAdapter msg; + + /** + * No-op constructor to support {@link Externalizable} interface. + * This constructor is not meant to be used for other purposes. + */ + public GridIoMessage() { + // No-op. + } + + /** + * @param plc Policy. + * @param topic Communication topic. + * @param topicOrd Topic ordinal value. + * @param msg Message. - * @param msgId Message ID. ++ * @param ordered Message ordered flag. + * @param timeout Timeout. + * @param skipOnTimeout Whether message can be skipped on timeout. + */ - public GridIoMessage(GridIoPolicy plc, Object topic, int topicOrd, GridTcpCommunicationMessageAdapter msg, - long msgId, long timeout, boolean skipOnTimeout) { ++ public GridIoMessage( ++ GridIoPolicy plc, ++ Object topic, ++ int topicOrd, ++ GridTcpCommunicationMessageAdapter msg, ++ boolean ordered, ++ long timeout, ++ boolean skipOnTimeout ++ ) { + assert plc != null; + assert topic != null; + assert topicOrd <= Byte.MAX_VALUE; + assert msg != null; + + this.plc = plc; + this.msg = msg; + this.topic = topic; + this.topicOrd = topicOrd; - this.msgId = msgId; ++ this.ordered = ordered; + this.timeout = timeout; + this.skipOnTimeout = skipOnTimeout; + } + + /** + * @return Policy. + */ + GridIoPolicy policy() { + return plc; + } + + /** + * @return Topic. + */ + Object topic() { + return topic; + } + + /** + * @param topic Topic. + */ + void topic(Object topic) { + this.topic = topic; + } + + /** + * @return Topic bytes. + */ + byte[] topicBytes() { + return topicBytes; + } + + /** + * @param topicBytes Topic bytes. + */ + void topicBytes(byte[] topicBytes) { + this.topicBytes = topicBytes; + } + + /** + * @return Topic ordinal. + */ + int topicOrdinal() { + return topicOrd; + } + + /** + * @return Message. + */ + public Object message() { + return msg; + } + + /** - * @return Message ID. - */ - long messageId() { - return msgId; - } - - /** + * @return Message timeout. + */ + public long timeout() { + return timeout; + } + + /** + * @return Whether message can be skipped on timeout. + */ + public boolean skipOnTimeout() { + return skipOnTimeout; + } + + /** + * @return {@code True} if message is ordered, {@code false} otherwise. + */ + boolean isOrdered() { - return msgId > 0; ++ return ordered; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { - if (obj == this) - return true; - - if (!(obj instanceof GridIoMessage)) - return false; - - GridIoMessage other = (GridIoMessage)obj; - - return topic.equals(other.topic) && msgId == other.msgId; ++ throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { - int res = topic.hashCode(); - - res = 31 * res + (int)(msgId ^ (msgId >>> 32)); - res = 31 * res + topic.hashCode(); - - return res; ++ throw new AssertionError(); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridIoMessage _clone = new GridIoMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @SuppressWarnings("RedundantCast") + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridIoMessage _clone = (GridIoMessage)_msg; + + _clone.plc = plc; + _clone.topic = topic; + _clone.topicBytes = topicBytes; + _clone.topicOrd = topicOrd; - _clone.msgId = msgId; ++ _clone.ordered = ordered; + _clone.timeout = timeout; + _clone.skipOnTimeout = skipOnTimeout; + _clone.msg = msg != null ? (GridTcpCommunicationMessageAdapter)msg.clone() : null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putMessage(msg)) + return false; + + commState.idx++; + + case 1: - if (!commState.putLong(msgId)) ++ if (!commState.putBoolean(ordered)) + return false; + + commState.idx++; + + case 2: + if (!commState.putEnum(plc)) + return false; + + commState.idx++; + + case 3: + if (!commState.putBoolean(skipOnTimeout)) + return false; + + commState.idx++; + + case 4: + if (!commState.putLong(timeout)) + return false; + + commState.idx++; + + case 5: + if (!commState.putByteArray(topicBytes)) + return false; + + commState.idx++; + + case 6: + if (!commState.putInt(topicOrd)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: + Object msg0 = commState.getMessage(); + + if (msg0 == MSG_NOT_READ) + return false; + + msg = (GridTcpCommunicationMessageAdapter)msg0; + + commState.idx++; + + case 1: - if (buf.remaining() < 8) ++ if (buf.remaining() < 1) + return false; + - msgId = commState.getLong(); ++ ordered = commState.getBoolean(); + + commState.idx++; + + case 2: + if (buf.remaining() < 1) + return false; + + byte plc0 = commState.getByte(); + + plc = GridIoPolicy.fromOrdinal(plc0); + + commState.idx++; + + case 3: + if (buf.remaining() < 1) + return false; + + skipOnTimeout = commState.getBoolean(); + + commState.idx++; + + case 4: + if (buf.remaining() < 8) + return false; + + timeout = commState.getLong(); + + commState.idx++; + + case 5: + byte[] topicBytes0 = commState.getByteArray(); + + if (topicBytes0 == BYTE_ARR_NOT_READ) + return false; + + topicBytes = topicBytes0; + + commState.idx++; + + case 6: + if (buf.remaining() < 4) + return false; + + topicOrd = commState.getInt(); + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 8; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridIoMessage.class, this); + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 0000000,b720eb1..837b162 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@@ -1,0 -1,865 +1,847 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.managers.communication.*; + import org.apache.ignite.internal.managers.deployment.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jdk8.backport.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static org.apache.ignite.internal.GridTopic.*; + import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + + /** + * Cache communication manager. + */ + public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { + /** Message ID generator. */ + private static final AtomicLong idGen = new AtomicLong(); + + /** Delay in milliseconds between retries. */ + private long retryDelay; + + /** Number of retries using to send messages. */ + private int retryCnt; + + /** Indexed class handlers. */ + private Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); + + /** Handler registry. */ + private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage<K, V>>> + clsHandlers = new ConcurrentHashMap8<>(); + + /** Ordered handler registry. */ + private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage<K, V>>> orderedHandlers = + new ConcurrentHashMap8<>(); + + /** Stopping flag. */ + private boolean stopping; + + /** Error flag. */ + private final AtomicBoolean startErr = new AtomicBoolean(); + + /** Mutex. */ + private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); + + /** Deployment enabled. */ + private boolean depEnabled; + + /** Message listener. */ + private GridMessageListener lsnr = new GridMessageListener() { + @SuppressWarnings("unchecked") + @Override public void onMessage(final UUID nodeId, Object msg) { + if (log.isDebugEnabled()) + log.debug("Received unordered cache communication message [nodeId=" + nodeId + + ", locId=" + cctx.localNodeId() + ", msg=" + msg + ']'); + + final GridCacheMessage<K, V> cacheMsg = (GridCacheMessage<K, V>)msg; + + int msgIdx = cacheMsg.lookupIndex(); + + IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c = null; + + if (msgIdx >= 0) { + IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheMsg.cacheId()); + + if (cacheClsHandlers != null) + c = cacheClsHandlers[msgIdx]; + } + + if (c == null) + c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass())); + + if (c == null) { + if (log.isDebugEnabled()) + log.debug("Received message without registered handler (will ignore) [msg=" + msg + + ", nodeId=" + nodeId + ']'); + + return; + } + + long locTopVer = cctx.discovery().topologyVersion(); + long rmtTopVer = cacheMsg.topologyVersion(); + + if (locTopVer < rmtTopVer) { + if (log.isDebugEnabled()) + log.debug("Received message has higher topology version [msg=" + msg + + ", locTopVer=" + locTopVer + ", rmtTopVer=" + rmtTopVer + ']'); + + IgniteFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer); + + if (!topFut.isDone()) { + final IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c0 = c; + + topFut.listenAsync(new CI1<IgniteFuture<Long>>() { + @Override public void apply(IgniteFuture<Long> t) { + onMessage0(nodeId, cacheMsg, c0); + } + }); + + return; + } + } + + onMessage0(nodeId, cacheMsg, c); + } + }; + + /** {@inheritDoc} */ + @Override public void start0() throws IgniteCheckedException { + retryDelay = cctx.gridConfig().getNetworkSendRetryDelay(); + retryCnt = cctx.gridConfig().getNetworkSendRetryCount(); + + depEnabled = cctx.gridDeploy().enabled(); + + cctx.gridIO().addMessageListener(TOPIC_CACHE, lsnr); + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override protected void onKernalStop0(boolean cancel) { + cctx.gridIO().removeMessageListener(TOPIC_CACHE); + + for (Object ordTopic : orderedHandlers.keySet()) + cctx.gridIO().removeMessageListener(ordTopic); + + boolean interrupted = false; + + // Busy wait is intentional. + while (true) { + try { + if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS)) + break; + else + Thread.sleep(200); + } + catch (InterruptedException ignore) { + // Preserve interrupt status & ignore. + // Note that interrupted flag is cleared. + interrupted = true; + } + } + + if (interrupted) + Thread.currentThread().interrupt(); + + try { + stopping = true; + } + finally { + rw.writeUnlock(); + } + } + + /** + * @param nodeId Node ID. + * @param cacheMsg Cache message. + * @param c Handler closure. + */ + private void onMessage0(final UUID nodeId, final GridCacheMessage<K, V> cacheMsg, + final IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c) { + rw.readLock(); + + try { + if (stopping) { + if (log.isDebugEnabled()) + log.debug("Received cache communication message while stopping (will ignore) [nodeId=" + + nodeId + ", msg=" + cacheMsg + ']'); + + return; + } + + if (depEnabled) + cctx.deploy().ignoreOwnership(true); + + unmarshall(nodeId, cacheMsg); + + if (cacheMsg.allowForStartup()) + processMessage(nodeId, cacheMsg, c); + else { + IgniteFuture<?> startFut = startFuture(cacheMsg); + + if (startFut.isDone()) + processMessage(nodeId, cacheMsg, c); + else { + if (log.isDebugEnabled()) + log.debug("Waiting for start future to complete for message [nodeId=" + nodeId + + ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); + + // Don't hold this thread waiting for preloading to complete. + startFut.listenAsync(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> f) { + rw.readLock(); + + try { + if (stopping) { + if (log.isDebugEnabled()) + log.debug("Received cache communication message while stopping " + + "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']'); + + return; + } + + f.get(); + + if (log.isDebugEnabled()) + log.debug("Start future completed for message [nodeId=" + nodeId + + ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); + + processMessage(nodeId, cacheMsg, c); + } + catch (IgniteCheckedException e) { + // Log once. + if (startErr.compareAndSet(false, true)) + U.error(log, "Failed to complete preload start future (will ignore message) " + + "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e); + } + finally { + rw.readUnlock(); + } + } + }); + } + } + } + catch (Throwable e) { + if (X.hasCause(e, ClassNotFoundException.class)) + U.error(log, "Failed to process message (note that distributed services " + + "do not support peer class loading, if you deploy distributed service " + + "you should have all required classes in CLASSPATH on all nodes in topology) " + + "[senderId=" + nodeId + ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']'); + else + U.error(log, "Failed to process message [senderId=" + nodeId + ']', e); + } + finally { + if (depEnabled) + cctx.deploy().ignoreOwnership(false); + + rw.readUnlock(); + } + } + + /** + * @param cacheMsg Cache message to get start future. + * @return Preloader start future. + */ + private IgniteFuture<Object> startFuture(GridCacheMessage<K, V> cacheMsg) { + int cacheId = cacheMsg.cacheId(); + + return cacheId != 0 ? cctx.cacheContext(cacheId).preloader().startFuture() : cctx.preloadersStartFuture(); + } + + /** + * @param nodeId Node ID. + * @param msg Message. + * @param c Closure. + */ + private void processMessage(UUID nodeId, GridCacheMessage<K, V> msg, + IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c) { + try { + // Start clean. + if (msg.transactional()) + CU.resetTxContext(cctx); + + // We will not end up with storing a bunch of new UUIDs + // in each cache entry, since node ID is stored in NIO session + // on handshake. + c.apply(nodeId, msg); + + if (log.isDebugEnabled()) + log.debug("Finished processing cache communication message [nodeId=" + nodeId + ", msg=" + msg + ']'); + } + catch (Throwable e) { + U.error(log, "Failed processing message [senderId=" + nodeId + ']', e); + } + finally { + // Clear thread-local tx contexts. + CU.resetTxContext(cctx); + + // Unwind eviction notifications. + CU.unwindEvicts(cctx); + } + } + + /** + * Pre-processes message prior to send. + * + * @param msg Message to send. + * @param destNodeId Destination node ID. + * @throws IgniteCheckedException If failed. + */ + private void onSend(GridCacheMessage<K, V> msg, @Nullable UUID destNodeId) throws IgniteCheckedException { + if (msg.messageId() < 0) + // Generate and set message ID. + msg.messageId(idGen.incrementAndGet()); + + if (destNodeId == null || !cctx.localNodeId().equals(destNodeId)) { + msg.prepareMarshal(cctx); + + if (depEnabled && msg instanceof GridCacheDeployable) + cctx.deploy().prepare((GridCacheDeployable)msg); + } + } + + /** + * Sends communication message. + * + * @param node Node to send the message to. + * @param msg Message to send. + * @throws IgniteCheckedException If sending failed. + * @throws ClusterTopologyException If receiver left. + */ + public void send(ClusterNode node, GridCacheMessage<K, V> msg) throws IgniteCheckedException { + send(node, msg, SYSTEM_POOL); + } + + /** + * Sends communication message. + * + * @param node Node to send the message to. + * @param msg Message to send. + * @throws IgniteCheckedException If sending failed. + * @throws ClusterTopologyException If receiver left. + */ + public void send(ClusterNode node, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException { + assert !node.isLocal(); + + onSend(msg, node.id()); + + if (log.isDebugEnabled()) + log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']'); + + int cnt = 0; + boolean first = true; + + while (cnt <= retryCnt) { + try { + cnt++; + + GridCacheMessage<K, V> msg0; + + if (first) { + msg0 = msg; + + first = false; + } + else + msg0 = (GridCacheMessage<K, V>)msg.clone(); + + cctx.gridIO().send(node, TOPIC_CACHE, msg0, plc); + + return; + } + catch (IgniteCheckedException e) { + if (!cctx.discovery().alive(node.id()) || !cctx.discovery().pingNode(node.id())) + throw new ClusterTopologyException("Node left grid while sending message to: " + node.id(), e); + + if (cnt == retryCnt) + throw e; + else if (log.isDebugEnabled()) + log.debug("Failed to send message to node (will retry): " + node.id()); + } + + U.sleep(retryDelay); + } + + if (log.isDebugEnabled()) + log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']'); + } + + /** + * Sends message and automatically accounts for lefts nodes. + * + * @param nodes Nodes to send to. + * @param msg Message to send. + * @param fallback Callback for failed nodes. + * @return {@code True} if nodes are empty or message was sent, {@code false} if + * all nodes have left topology while sending this message. + * @throws IgniteCheckedException If send failed. + */ + @SuppressWarnings( {"BusyWait"}) + public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage<K, V> msg, + @Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException { + assert nodes != null; + assert msg != null; + + if (nodes.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Message will not be sent as collection of nodes is empty: " + msg); + + return true; + } + + onSend(msg, null); + + if (log.isDebugEnabled()) + log.debug("Sending cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']'); + + final Collection<UUID> leftIds = new GridLeanSet<>(); + + int cnt = 0; + boolean first = true; + + while (cnt < retryCnt) { + try { + Collection<? extends ClusterNode> nodesView = F.view(nodes, new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode e) { + return !leftIds.contains(e.id()); + } + }); + + GridCacheMessage<K, V> msg0; + + if (first) { + msg0 = msg; + + first = false; + } + else + msg0 = (GridCacheMessage<K, V>)msg.clone(); + + cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, SYSTEM_POOL); + + boolean added = false; + + // Even if there is no exception, we still check here, as node could have + // ignored the message during stopping. + for (ClusterNode n : nodes) { + if (!leftIds.contains(n.id()) && !cctx.discovery().alive(n.id())) { + leftIds.add(n.id()); + + if (fallback != null && !fallback.apply(n)) + // If fallback signalled to stop. + return false; + + added = true; + } + } + + if (added) { + if (!F.exist(F.nodeIds(nodes), F0.not(F.contains(leftIds)))) { + if (log.isDebugEnabled()) + log.debug("Message will not be sent because all nodes left topology [msg=" + msg + + ", nodes=" + U.toShortString(nodes) + ']'); + + return false; + } + } + + break; + } + catch (IgniteCheckedException e) { + boolean added = false; + + for (ClusterNode n : nodes) { + if (!leftIds.contains(n.id()) && + (!cctx.discovery().alive(n.id()) || !cctx.discovery().pingNode(n.id()))) { + leftIds.add(n.id()); + + if (fallback != null && !fallback.apply(n)) + // If fallback signalled to stop. + return false; + + added = true; + } + } + + if (!added) { + cnt++; + + if (cnt == retryCnt) + throw e; + + U.sleep(retryDelay); + } + + if (!F.exist(F.nodeIds(nodes), F0.not(F.contains(leftIds)))) { + if (log.isDebugEnabled()) + log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" + + U.toShortString(nodes) + ']'); + + return false; + } + + if (log.isDebugEnabled()) + log.debug("Message send will be retried [msg=" + msg + ", nodes=" + U.toShortString(nodes) + + ", leftIds=" + leftIds + ']'); + } + } + + if (log.isDebugEnabled()) + log.debug("Sent cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']'); + + return true; + } + + /** + * Sends communication message. + * + * @param nodeId ID of node to send the message to. + * @param msg Message to send. + * @throws IgniteCheckedException If sending failed. + */ + public void send(UUID nodeId, GridCacheMessage<K, V> msg) throws IgniteCheckedException { + ClusterNode n = cctx.discovery().node(nodeId); + + if (n == null) + throw new ClusterTopologyException("Failed to send message because node left grid [node=" + n + ", msg=" + + msg + ']'); + + send(n, msg); + } + + /** + * Sends communication message. + * + * @param nodeId ID of node to send the message to. + * @param msg Message to send. + * @throws IgniteCheckedException If sending failed. + */ + public void send(UUID nodeId, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException { + ClusterNode n = cctx.discovery().node(nodeId); + + if (n == null) + throw new ClusterTopologyException("Failed to send message because node left grid [node=" + n + ", msg=" + + msg + ']'); + + send(n, msg, plc); + } + + /** + * @param node Destination node. + * @param topic Topic to send the message to. - * @param msgId Ordered message ID. + * @param msg Message to send. + * @param timeout Timeout to keep a message on receiving queue. + * @throws IgniteCheckedException Thrown in case of any errors. + */ - public void sendOrderedMessage(ClusterNode node, Object topic, long msgId, GridCacheMessage<K, V> msg, ++ public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage<K, V> msg, + long timeout) throws IgniteCheckedException { + onSend(msg, node.id()); + + int cnt = 0; + + while (cnt <= retryCnt) { + try { + cnt++; + - cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, SYSTEM_POOL, timeout, false); ++ cctx.gridIO().sendOrderedMessage(node, topic, msg, SYSTEM_POOL, timeout, false); + + if (log.isDebugEnabled()) + log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg + + ", nodeId=" + node.id() + ']'); + + return; + } + catch (IgniteCheckedException e) { + if (cctx.discovery().node(node.id()) == null) + throw new ClusterTopologyException("Node left grid while sending ordered message to: " + node.id(), e); + + if (cnt == retryCnt) + throw e; + else if (log.isDebugEnabled()) + log.debug("Failed to send message to node (will retry): " + node.id()); + } + + U.sleep(retryDelay); + } + } + + /** - * @param topic Message topic. - * @param nodeId Node ID. - * @return Next ordered message ID. - */ - public long messageId(Object topic, UUID nodeId) { - return cctx.gridIO().nextMessageId(topic, nodeId); - } - - /** + * @return ID that auto-grows based on local counter and counters received + * from other nodes. + */ + public long nextIoId() { + return idGen.incrementAndGet(); + } + + /** + * Adds message handler. + * + * @param type Type of message. + * @param c Handler. + */ + @SuppressWarnings({"unchecked"}) + public void addHandler( + int cacheId, + Class<? extends GridCacheMessage> type, + IgniteBiInClosure<UUID, ? extends GridCacheMessage<K, V>> c) { + int msgIdx = messageIndex(type); + + if (msgIdx != -1) { + IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheId); + + if (cacheClsHandlers == null) { + cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX]; + + idxClsHandlers.put(cacheId, cacheClsHandlers); + } + + if (cacheClsHandlers[msgIdx] != null) + throw new IgniteException("Duplicate cache message ID found [cacheId=" + cacheId + + ", type=" + type + ']'); + + cacheClsHandlers[msgIdx] = c; + + return; + } + else { + ListenerKey key = new ListenerKey(cacheId, type); + + if (clsHandlers.putIfAbsent(key, + (IgniteBiInClosure<UUID, GridCacheMessage<K, V>>)c) != null) + assert false : "Handler for class already registered [cacheId=" + cacheId + ", cls=" + type + + ", old=" + clsHandlers.get(key) + ", new=" + c + ']'; + } + + if (log != null && log.isDebugEnabled()) + log.debug("Registered cache communication handler [cacheId=" + cacheId + ", type=" + type + + ", msgIdx=" + msgIdx + ", handler=" + c + ']'); + } + + /** + * @param lsnr Listener to add. + */ + public void addDisconnectListener(GridDisconnectListener lsnr) { + cctx.kernalContext().io().addDisconnectListener(lsnr); + } + + /** + * @param msgCls Message class to check. + * @return Message index. + */ + private int messageIndex(Class<?> msgCls) { + try { + Integer msgIdx = U.field(msgCls, GridCacheMessage.CACHE_MSG_INDEX_FIELD_NAME); + + if (msgIdx == null || msgIdx < 0) + return -1; + + return msgIdx; + } + catch (IgniteCheckedException ignored) { + return -1; + } + } + + /** + * Removes message handler. + * + * @param type Type of message. + * @param c Handler. + */ + public void removeHandler(Class<?> type, IgniteBiInClosure<UUID, ?> c) { + assert type != null; + assert c != null; + + boolean res = clsHandlers.remove(type, c); + + if (log != null && log.isDebugEnabled()) { + if (res) { + log.debug("Removed cache communication handler " + + "[type=" + type + ", handler=" + c + ']'); + } + else { + log.debug("Cache communication handler is not registered " + + "[type=" + type + ", handler=" + c + ']'); + } + } + } + + /** + * Adds ordered message handler. + * + * @param topic Topic. + * @param c Handler. + */ + @SuppressWarnings({"unchecked"}) + public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage<K, V>> c) { + if (orderedHandlers.putIfAbsent(topic, c) == null) { + cctx.gridIO().addMessageListener(topic, new OrderedMessageListener( + (IgniteBiInClosure<UUID, GridCacheMessage<K, V>>)c)); + + if (log != null && log.isDebugEnabled()) + log.debug("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']'); + } + else if (log != null) + U.warn(log, "Failed to register ordered cache communication handler because it is already " + + "registered for this topic [topic=" + topic + ", handler=" + c + ']'); + } + + /** + * Removed ordered message handler. + * + * @param topic Topic. + */ + public void removeOrderedHandler(Object topic) { + if (orderedHandlers.remove(topic) != null) { - cctx.gridIO().removeMessageId(topic); + cctx.gridIO().removeMessageListener(topic); + + if (log != null && log.isDebugEnabled()) + log.debug("Unregistered ordered cache communication handler for topic:" + topic); + } + else if (log != null) + U.warn(log, "Failed to unregister ordered cache communication handler because it was not found " + + "for topic: " + topic); + } + + /** - * @param topic Message topic. - */ - public void removeMessageId(Object topic) { - cctx.gridIO().removeMessageId(topic); - } - - /** + * @param nodeId Sender node ID. + * @param cacheMsg Message. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("ErrorNotRethrown") + private void unmarshall(UUID nodeId, GridCacheMessage<K, V> cacheMsg) throws IgniteCheckedException { + if (cctx.localNodeId().equals(nodeId)) + return; + + GridDeploymentInfo bean = cacheMsg.deployInfo(); + + if (bean != null) { + assert depEnabled : "Received deployment info while peer class loading is disabled [nodeId=" + nodeId + + ", msg=" + cacheMsg + ']'; + + cctx.deploy().p2pContext(nodeId, bean.classLoaderId(), bean.userVersion(), + bean.deployMode(), bean.participants(), bean.localDeploymentOwner()); + + if (log.isDebugEnabled()) + log.debug("Set P2P context [senderId=" + nodeId + ", msg=" + cacheMsg + ']'); + } + + try { + cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader()); + } + catch (IgniteCheckedException e) { + if (cacheMsg.ignoreClassErrors() && X.hasCause(e, InvalidClassException.class, + ClassNotFoundException.class, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) + cacheMsg.onClassError(e); + else + throw e; + } + catch (Error e) { + if (cacheMsg.ignoreClassErrors() && X.hasCause(e, NoClassDefFoundError.class, + UnsupportedClassVersionError.class)) + cacheMsg.onClassError(new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e)); + else + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>> "); + X.println(">>> Cache IO manager memory stats [grid=" + cctx.gridName() + ']'); + X.println(">>> clsHandlersSize: " + clsHandlers.size()); + X.println(">>> orderedHandlersSize: " + orderedHandlers.size()); + } + + /** + * Ordered message listener. + */ + private class OrderedMessageListener implements GridMessageListener { + /** */ + private final IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c; + + /** + * @param c Handler closure. + */ + OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c) { + this.c = c; + } + + /** {@inheritDoc} */ + @SuppressWarnings( {"CatchGenericClass", "unchecked"}) + @Override public void onMessage(final UUID nodeId, Object msg) { + if (log.isDebugEnabled()) + log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']'); + + final GridCacheMessage<K, V> cacheMsg = (GridCacheMessage<K, V>)msg; + + onMessage0(nodeId, cacheMsg, c); + } + } + + private static class ListenerKey { + /** Cache ID. */ + private int cacheId; + + /** Message class. */ + private Class<? extends GridCacheMessage> msgCls; + + /** + * @param cacheId Cache ID. + * @param msgCls Message class. + */ + private ListenerKey(int cacheId, Class<? extends GridCacheMessage> msgCls) { + this.cacheId = cacheId; + this.msgCls = msgCls; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof ListenerKey)) + return false; + + ListenerKey that = (ListenerKey)o; + + return cacheId == that.cacheId && msgCls.equals(that.msgCls); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = cacheId; + + result = 31 * result + msgCls.hashCode(); + + return result; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 0000000,30b536c..5011e35 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@@ -1,0 -1,1139 +1,1129 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.thread.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.processors.timeout.*; + import org.apache.ignite.internal.util.future.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.worker.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + import java.util.concurrent.locks.*; + + import static java.util.concurrent.TimeUnit.*; + import static org.apache.ignite.events.IgniteEventType.*; + import static org.apache.ignite.internal.GridTopic.*; + import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + import static org.apache.ignite.internal.processors.dr.GridDrType.*; + + /** + * Thread pool for requesting partitions from other nodes + * and populating local cache. + */ + @SuppressWarnings("NonConstantFieldWithUpperCaseName") + public class GridDhtPartitionDemandPool<K, V> { + /** Dummy message to wake up a blocking queue if a node leaves. */ + private final SupplyMessage<K, V> DUMMY_TOP = new SupplyMessage<>(); + + /** */ + private final GridCacheContext<K, V> cctx; + + /** */ + private final IgniteLogger log; + + /** */ + private final ReadWriteLock busyLock; + + /** */ - private GridDhtPartitionTopology<K, V> top; - - /** */ + @GridToStringInclude + private final Collection<DemandWorker> dmdWorkers; + + /** Preload predicate. */ + private IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred; + + /** Future for preload mode {@link org.apache.ignite.cache.CachePreloadMode#SYNC}. */ + @GridToStringInclude + private SyncFuture syncFut; + + /** Preload timeout. */ + private final AtomicLong timeout; + + /** Allows demand threads to synchronize their step. */ + private CyclicBarrier barrier; + + /** Demand lock. */ + private final ReadWriteLock demandLock = new ReentrantReadWriteLock(); + + /** */ + private int poolSize; + + /** Last timeout object. */ + private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>(); + + /** Last exchange future. */ + private volatile GridDhtPartitionsExchangeFuture<K, V> lastExchangeFut; + + /** + * @param cctx Cache context. + * @param busyLock Shutdown lock. + */ + public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) { + assert cctx != null; + assert busyLock != null; + + this.cctx = cctx; + this.busyLock = busyLock; + + log = cctx.logger(getClass()); + - top = cctx.dht().topology(); - + poolSize = cctx.preloadEnabled() ? cctx.config().getPreloadThreadPoolSize() : 0; + + if (poolSize > 0) { + barrier = new CyclicBarrier(poolSize); + + dmdWorkers = new ArrayList<>(poolSize); + + for (int i = 0; i < poolSize; i++) + dmdWorkers.add(new DemandWorker(i)); + + syncFut = new SyncFuture(dmdWorkers); + } + else { + dmdWorkers = Collections.emptyList(); + + syncFut = new SyncFuture(dmdWorkers); + + // Calling onDone() immediately since preloading is disabled. + syncFut.onDone(); + } + + timeout = new AtomicLong(cctx.config().getPreloadTimeout()); + } + + /** + * + */ + void start() { + if (poolSize > 0) { + for (DemandWorker w : dmdWorkers) + new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start(); + } + } + + /** + * + */ + void stop() { + U.cancel(dmdWorkers); + + if (log.isDebugEnabled()) + log.debug("Before joining on demand workers: " + dmdWorkers); + + U.join(dmdWorkers, log); + + if (log.isDebugEnabled()) + log.debug("After joining on demand workers: " + dmdWorkers); + - top = null; + lastExchangeFut = null; + + lastTimeoutObj.set(null); + } + + /** + * @return Future for {@link org.apache.ignite.cache.CachePreloadMode#SYNC} mode. + */ + IgniteFuture<?> syncFuture() { + return syncFut; + } + + /** + * Sets preload predicate for demand pool. + * + * @param preloadPred Preload predicate. + */ + void preloadPredicate(IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred) { + this.preloadPred = preloadPred; + } + + /** + * @return Size of this thread pool. + */ + int poolSize() { + return poolSize; + } + + /** + * Wakes up demand workers when new exchange future was added. + */ + void onExchangeFutureAdded() { + synchronized (dmdWorkers) { + for (DemandWorker w : dmdWorkers) + w.addMessage(DUMMY_TOP); + } + } + + /** + * Force preload. + */ + void forcePreload() { + GridTimeoutObject obj = lastTimeoutObj.getAndSet(null); + + if (obj != null) + cctx.time().removeTimeoutObject(obj); + + final GridDhtPartitionsExchangeFuture<K, V> exchFut = lastExchangeFut; + + if (exchFut != null) { + if (log.isDebugEnabled()) + log.debug("Forcing preload event for future: " + exchFut); + + exchFut.listenAsync(new CI1<IgniteFuture<Long>>() { + @Override public void apply(IgniteFuture<Long> t) { + cctx.shared().exchange().forcePreloadExchange(exchFut); + } + }); + } + else if (log.isDebugEnabled()) + log.debug("Ignoring force preload request (no topology event happened yet)."); + } + + /** + * @return {@code true} if entered to busy state. + */ + private boolean enterBusy() { + if (busyLock.readLock().tryLock()) + return true; + + if (log.isDebugEnabled()) + log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId()); + + return false; + } + + /** + * + */ + private void leaveBusy() { + busyLock.readLock().unlock(); + } + + /** + * @param type Type. + * @param discoEvt Discovery event. + */ + private void preloadEvent(int type, IgniteDiscoveryEvent discoEvt) { + preloadEvent(-1, type, discoEvt); + } + + /** + * @param part Partition. + * @param type Type. + * @param discoEvt Discovery event. + */ + private void preloadEvent(int part, int type, IgniteDiscoveryEvent discoEvt) { + assert discoEvt != null; + + cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + } + + /** - * @return Dummy node-left message. - */ - private SupplyMessage<K, V> dummyTopology() { - return DUMMY_TOP; - } - - /** + * @param msg Message to check. + * @return {@code True} if dummy message. + */ + private boolean dummyTopology(SupplyMessage<K, V> msg) { + return msg == DUMMY_TOP; + } + + /** + * @param deque Deque to poll from. + * @param time Time to wait. + * @param w Worker. + * @return Polled item. + * @throws InterruptedException If interrupted. + */ + @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException { + assert w != null; + + // There is currently a case where {@code interrupted} + // flag on a thread gets flipped during stop which causes the pool to hang. This check + // will always make sure that interrupted flag gets reset before going into wait conditions. + // The true fix should actually make sure that interrupted flag does not get reset or that + // interrupted exception gets propagated. Until we find a real fix, this method should + // always work to make sure that there is no hanging during stop. + if (w.isCancelled()) + Thread.currentThread().interrupt(); + + return deque.poll(time, MILLISECONDS); + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return Picked owners. + */ + private Collection<ClusterNode> pickedOwners(int p, long topVer) { + Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); + + int affCnt = affNodes.size(); + + Collection<ClusterNode> rmts = remoteOwners(p, topVer); + + int rmtCnt = rmts.size(); + + if (rmtCnt <= affCnt) + return rmts; + + List<ClusterNode> sorted = new ArrayList<>(rmts); + + // Sort in descending order, so nodes with higher order will be first. + Collections.sort(sorted, CU.nodeComparator(false)); + + // Pick newest nodes. + return sorted.subList(0, affCnt); + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return Nodes owning this partition. + */ + private Collection<ClusterNode> remoteOwners(int p, long topVer) { - return F.view(top.owners(p, topVer), F.remoteNodes(cctx.nodeId())); ++ return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); + } + + /** + * @param assigns Assignments. + * @param force {@code True} if dummy reassign. + */ + void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) { + if (log.isDebugEnabled()) + log.debug("Adding partition assignments: " + assigns); + + long delay = cctx.config().getPreloadPartitionedDelay(); + + if (delay == 0 || force) { + assert assigns != null; + + synchronized (dmdWorkers) { + for (DemandWorker w : dmdWorkers) { + w.addAssignments(assigns); + + w.addMessage(DUMMY_TOP); + } + } + } + else if (delay > 0) { + assert !force; + + GridTimeoutObject obj = lastTimeoutObj.get(); + + if (obj != null) + cctx.time().removeTimeoutObject(obj); + + final GridDhtPartitionsExchangeFuture<K, V> exchFut = lastExchangeFut; + + assert exchFut != null : "Delaying preload process without topology event."; + + obj = new GridTimeoutObjectAdapter(delay) { + @Override public void onTimeout() { + exchFut.listenAsync(new CI1<IgniteFuture<Long>>() { + @Override public void apply(IgniteFuture<Long> f) { + cctx.shared().exchange().forcePreloadExchange(exchFut); + } + }); + } + }; + + lastTimeoutObj.set(obj); + + cctx.time().addTimeoutObject(obj); + } + } + + /** + * + */ + void unwindUndeploys() { + demandLock.writeLock().lock(); + + try { + cctx.deploy().unwind(); + } + finally { + demandLock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionDemandPool.class, this); + } + + /** + * + */ + private class DemandWorker extends GridWorker { + /** Worker ID. */ + private int id; + + /** Partition-to-node assignments. */ + private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>(); + + /** Message queue. */ + private final LinkedBlockingDeque<SupplyMessage<K, V>> msgQ = + new LinkedBlockingDeque<>(); + + /** Counter. */ + private long cntr; + + /** Hide worker logger and use cache logger instead. */ + private IgniteLogger log = GridDhtPartitionDemandPool.this.log; + + /** + * @param id Worker ID. + */ + private DemandWorker(int id) { + super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemandPool.this.log); + + assert id >= 0; + + this.id = id; + } + + /** + * @param assigns Assignments. + */ + void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) { + assert assigns != null; + + assignQ.offer(assigns); + + if (log.isDebugEnabled()) + log.debug("Added assignments to worker: " + this); + } + + /** + * @return {@code True} if topology changed. + */ + private boolean topologyChanged() { + return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged(); + } + + /** + * @param msg Message. + */ + private void addMessage(SupplyMessage<K, V> msg) { + if (!enterBusy()) + return; + + try { + assert dummyTopology(msg) || msg.supply().workerId() == id; + + msgQ.offer(msg); + } + finally { + leaveBusy(); + } + } + + /** + * @param timeout Timed out value. + */ + private void growTimeout(long timeout) { + long newTimeout = (long)(timeout * 1.5D); + + // Account for overflow. + if (newTimeout < 0) + newTimeout = Long.MAX_VALUE; + + // Grow by 50% only if another thread didn't do it already. + if (GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout)) + U.warn(log, "Increased preloading message timeout from " + timeout + "ms to " + + newTimeout + "ms."); + } + + /** + * @param pick Node picked for preloading. + * @param p Partition. + * @param entry Preloaded entry. + * @param topVer Topology version. + * @return {@code False} if partition has become invalid during preloading. + * @throws org.apache.ignite.IgniteInterruptedException If interrupted. + */ + private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, long topVer) - throws IgniteCheckedException, IgniteInterruptedException { ++ throws IgniteCheckedException { + try { + GridCacheEntryEx<K, V> cached = null; + + try { + cached = cctx.dht().entryEx(entry.key()); + + if (log.isDebugEnabled()) + log.debug("Preloading key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']'); + + if (cctx.dht().isGgfsDataCache() && + cctx.dht().ggfsDataSpaceUsed() > cctx.dht().ggfsDataSpaceMax()) { + LT.error(log, null, "Failed to preload GGFS data cache (GGFS space size exceeded maximum " + + "value, will ignore preload entries): " + name()); + + if (cached.markObsoleteIfEmpty(null)) + cached.context().cache().removeIfObsolete(cached.key()); + + return true; + } + + if (preloadPred == null || preloadPred.apply(entry)) { + if (cached.initialValue( + entry.value(), + entry.valueBytes(), + entry.version(), + entry.ttl(), + entry.expireTime(), + true, + topVer, + cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE + )) { + cctx.evicts().touch(cached, topVer); // Start tracking. + + if (cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED) && !cached.isInternal()) + cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), + (IgniteUuid)null, null, EVT_CACHE_PRELOAD_OBJECT_LOADED, entry.value(), true, null, + false, null, null, null); + } + else if (log.isDebugEnabled()) + log.debug("Preloading entry is already in cache (will ignore) [key=" + cached.key() + + ", part=" + p + ']'); + } + else if (log.isDebugEnabled()) + log.debug("Preload predicate evaluated to false for entry (will ignore): " + entry); + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Entry has been concurrently removed while preloading (will ignore) [key=" + + cached.key() + ", part=" + p + ']'); + } + catch (GridDhtInvalidPartitionException ignored) { + if (log.isDebugEnabled()) + log.debug("Partition became invalid during preloading (will ignore): " + p); + + return false; + } + } + catch (IgniteInterruptedException e) { + throw e; + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Failed to cache preloaded entry (will stop preloading) [local=" + + cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); + } + + return true; + } + + /** + * @param idx Unique index for this topic. + * @return Topic for partition. + */ + public Object topic(long idx) { + return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx); + } + + /** + * @param node Node to demand from. + * @param topVer Topology version. + * @param d Demand message. + * @param exchFut Exchange future. + * @return Missed partitions. + * @throws InterruptedException If interrupted. + * @throws ClusterTopologyException If node left. + * @throws IgniteCheckedException If failed to send message. + */ + private Set<Integer> demandFromNode(ClusterNode node, final long topVer, GridDhtPartitionDemandMessage<K, V> d, + GridDhtPartitionsExchangeFuture<K, V> exchFut) throws InterruptedException, IgniteCheckedException { + cntr++; + + d.topic(topic(cntr)); + d.workerId(id); + + Set<Integer> missed = new HashSet<>(); + + // Get the same collection that will be sent in the message. + Collection<Integer> remaining = d.partitions(); + + // Drain queue before processing a new node. + drainQueue(); + + if (isCancelled() || topologyChanged()) + return missed; + + cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage<K, V>>() { + @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage<K, V> msg) { + addMessage(new SupplyMessage<>(nodeId, msg)); + } + }); + + try { + boolean retry; + + // DoWhile. + // ======= + do { + retry = false; + + // Create copy. + d = new GridDhtPartitionDemandMessage<>(d, remaining); + + long timeout = GridDhtPartitionDemandPool.this.timeout.get(); + + d.timeout(timeout); + + if (log.isDebugEnabled()) + log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']'); + + // Send demand message. + cctx.io().send(node, d); + + // While. + // ===== + while (!isCancelled() && !topologyChanged()) { + SupplyMessage<K, V> s = poll(msgQ, timeout, this); + + // If timed out. + if (s == null) { + if (msgQ.isEmpty()) { // Safety check. + U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout + + " ms (you may need to increase 'networkTimeout' or 'preloadBatchSize'" + + " configuration properties)."); + + growTimeout(timeout); + + // Ordered listener was removed if timeout expired. + cctx.io().removeOrderedHandler(d.topic()); + + // Must create copy to be able to work with IO manager thread local caches. + d = new GridDhtPartitionDemandMessage<>(d, remaining); + + // Create new topic. + d.topic(topic(++cntr)); + + // Create new ordered listener. + cctx.io().addOrderedHandler(d.topic(), + new CI2<UUID, GridDhtPartitionSupplyMessage<K, V>>() { + @Override public void apply(UUID nodeId, + GridDhtPartitionSupplyMessage<K, V> msg) { + addMessage(new SupplyMessage<>(nodeId, msg)); + } + }); + + // Resend message with larger timeout. + retry = true; + + break; // While. + } + else + continue; // While. + } + + // If topology changed. + if (dummyTopology(s)) { + if (topologyChanged()) + break; // While. + else + continue; // While. + } + + // Check that message was received from expected node. + if (!s.senderId().equals(node.id())) { + U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() + + ", rcvdId=" + s.senderId() + ", msg=" + s + ']'); + + continue; // While. + } + + if (log.isDebugEnabled()) + log.debug("Received supply message: " + s); + + GridDhtPartitionSupplyMessage<K, V> supply = s.supply(); + + // Check whether there were class loading errors on unmarshal + if (supply.classError() != null) { + if (log.isDebugEnabled()) + log.debug("Class got undeployed during preloading: " + supply.classError()); + + retry = true; + + // Quit preloading. + break; + } + + // Preload. + for (Map.Entry<Integer, Collection<GridCacheEntryInfo<K, V>>> e : supply.infos().entrySet()) { + int p = e.getKey(); + + if (cctx.affinity().localNode(p, topVer)) { - GridDhtLocalPartition<K, V> part = top.localPartition(p, topVer, true); ++ GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, topVer, true); + + assert part != null; + + if (part.state() == MOVING) { + boolean reserved = part.reserve(); + + assert reserved : "Failed to reserve partition [gridName=" + + cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']'; + + part.lock(); + + try { + Collection<Integer> invalidParts = new GridLeanSet<>(); + + // Loop through all received entries and try to preload them. + for (GridCacheEntryInfo<K, V> entry : e.getValue()) { + if (!invalidParts.contains(p)) { + if (!part.preloadingPermitted(entry.key(), entry.version())) { + if (log.isDebugEnabled()) + log.debug("Preloading is not permitted for entry due to " + + "evictions [key=" + entry.key() + + ", ver=" + entry.version() + ']'); + + continue; + } + + if (!preloadEntry(node, p, entry, topVer)) { + invalidParts.add(p); + + if (log.isDebugEnabled()) + log.debug("Got entries for invalid partition during " + + "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); + } + } + } + + boolean last = supply.last().contains(p); + + // If message was last for this partition, + // then we take ownership. + if (last) { + remaining.remove(p); + - top.own(part); ++ cctx.dht().topology().own(part); + + if (log.isDebugEnabled()) + log.debug("Finished preloading partition: " + part); + + if (cctx.events().isRecordable(EVT_CACHE_PRELOAD_PART_LOADED)) + preloadEvent(p, EVT_CACHE_PRELOAD_PART_LOADED, + exchFut.discoveryEvent()); + } + } + finally { + part.unlock(); + part.release(); + } + } + else { + remaining.remove(p); + + if (log.isDebugEnabled()) + log.debug("Skipping loading partition (state is not MOVING): " + part); + } + } + else { + remaining.remove(p); + + if (log.isDebugEnabled()) + log.debug("Skipping loading partition (it does not belong on current node): " + p); + } + } + + remaining.removeAll(s.supply().missed()); + + // Only request partitions based on latest topology version. + for (Integer miss : s.supply().missed()) + if (cctx.affinity().localNode(miss, topVer)) + missed.add(miss); + + if (remaining.isEmpty()) + break; // While. + + if (s.supply().ack()) { + retry = true; + + break; + } + } + } + while (retry && !isCancelled() && !topologyChanged()); + + return missed; + } + finally { + cctx.io().removeOrderedHandler(d.topic()); + } + } + + /** + * @throws InterruptedException If interrupted. + */ + private void drainQueue() throws InterruptedException { + while (!msgQ.isEmpty()) { + SupplyMessage<K, V> msg = msgQ.take(); + + if (log.isDebugEnabled()) + log.debug("Drained supply message: " + msg); + } + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + try { + int preloadOrder = cctx.config().getPreloadOrder(); + + if (preloadOrder > 0) { + IgniteFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(preloadOrder); + + try { + if (fut != null) { + if (log.isDebugEnabled()) + log.debug("Waiting for dependant caches preload [cacheName=" + cctx.name() + + ", preloadOrder=" + preloadOrder + ']'); + + fut.get(); + } + } + catch (IgniteInterruptedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to wait for ordered preload future (grid is stopping): " + + "[cacheName=" + cctx.name() + ", preloadOrder=" + preloadOrder + ']'); + + return; + } + catch (IgniteCheckedException e) { + throw new Error("Ordered preload future should never fail: " + e.getMessage(), e); + } + } + + GridDhtPartitionsExchangeFuture<K, V> exchFut = null; + + boolean stopEvtFired = false; + + while (!isCancelled()) { + try { + barrier.await(); + + if (id == 0 && exchFut != null && !exchFut.dummy() && + cctx.events().isRecordable(EVT_CACHE_PRELOAD_STOPPED)) { + + if (!cctx.isReplicated() || !stopEvtFired) { + preloadEvent(EVT_CACHE_PRELOAD_STOPPED, exchFut.discoveryEvent()); + + stopEvtFired = true; + } + } + } + catch (BrokenBarrierException ignore) { + throw new InterruptedException("Demand worker stopped."); + } + + // Sync up all demand threads at this step. + GridDhtPreloaderAssignments<K, V> assigns = null; + + while (assigns == null) + assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this); + + demandLock.readLock().lock(); + + try { + exchFut = assigns.exchangeFuture(); + + // Assignments are empty if preloading is disabled. + if (assigns.isEmpty()) + continue; + + boolean resync = false; + + // While. + // ===== + while (!isCancelled() && !topologyChanged() && !resync) { + Collection<Integer> missed = new HashSet<>(); + + // For. + // === + for (ClusterNode node : assigns.keySet()) { + if (topologyChanged() || isCancelled()) + break; // For. + + GridDhtPartitionDemandMessage<K, V> d = assigns.remove(node); + + // If another thread is already processing this message, + // move to the next node. + if (d == null) + continue; // For. + + try { + Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut); + + if (!set.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" + + set + ']'); + + missed.addAll(set); + } + } + catch (IgniteInterruptedException e) { + throw e; + } + catch (ClusterTopologyException e) { + if (log.isDebugEnabled()) + log.debug("Node left during preloading (will retry) [node=" + node.id() + + ", msg=" + e.getMessage() + ']'); + + resync = true; + + break; // For. + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to receive partitions from node (preloading will not " + + "fully finish) [node=" + node.id() + ", msg=" + d + ']', e); + } + } + + // Processed missed entries. + if (!missed.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Reassigning partitions that were missed: " + missed); + + assert exchFut.exchangeId() != null; + + cctx.shared().exchange().forceDummyExchange(true, exchFut); + } + else + break; // While. + } + } + finally { + demandLock.readLock().unlock(); + + syncFut.onWorkerDone(this); + } + + cctx.shared().exchange().scheduleResendPartitions(); + } + } + finally { + // Safety. + syncFut.onWorkerDone(this); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString()); + } + } + + /** + * Sets last exchange future. + * + * @param lastFut Last future to set. + */ + void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture<K, V> lastFut) { + lastExchangeFut = lastFut; + } + + /** + * @param exchFut Exchange future. + * @return Assignments of partitions to nodes. + */ + GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture<K, V> exchFut) { + // No assignments for disabled preloader. ++ GridDhtPartitionTopology<K, V> top = cctx.dht().topology(); ++ + if (!cctx.preloadEnabled()) + return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + + int partCnt = cctx.affinity().partitions(); + + assert exchFut.forcePreload() || exchFut.dummyReassign() || + exchFut.exchangeId().topologyVersion() == top.topologyVersion() : - "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", topVer=" + top.topologyVersion() + ']'; ++ "Topology version mismatch [exchId=" + exchFut.exchangeId() + ++ ", topVer=" + top.topologyVersion() + ']'; + + GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + + long topVer = assigns.topologyVersion(); + + for (int p = 0; p < partCnt; p++) { + if (cctx.shared().exchange().hasPendingExchange()) { + if (log.isDebugEnabled()) + log.debug("Skipping assignments creation, exchange worker has pending assignments: " + + exchFut.exchangeId()); + + break; + } + + // If partition belongs to local node. + if (cctx.affinity().localNode(p, topVer)) { + GridDhtLocalPartition<K, V> part = top.localPartition(p, topVer, true); + + assert part != null; + assert part.id() == p; + + if (part.state() != MOVING) { + if (log.isDebugEnabled()) + log.debug("Skipping partition assignment (state is not MOVING): " + part); + + continue; // For. + } + + Collection<ClusterNode> picked = pickedOwners(p, topVer); + + if (picked.isEmpty()) { + top.own(part); + + if (log.isDebugEnabled()) + log.debug("Owning partition as there are no other owners: " + part); + } + else { + ClusterNode n = F.first(picked); + + GridDhtPartitionDemandMessage<K, V> msg = assigns.get(n); + + if (msg == null) { + assigns.put(n, msg = new GridDhtPartitionDemandMessage<>( + top.updateSequence(), + exchFut.exchangeId().topologyVersion(), + cctx.cacheId())); + } + + msg.addPartition(p); + } + } + } + + return assigns; + } + + /** + * + */ + private class SyncFuture extends GridFutureAdapter<Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** Remaining workers. */ + private Collection<DemandWorker> remaining; + + /** + * @param workers List of workers. + */ + private SyncFuture(Collection<DemandWorker> workers) { + super(cctx.kernalContext()); + + assert workers.size() == poolSize(); + + remaining = Collections.synchronizedList(new LinkedList<>(workers)); + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public SyncFuture() { + assert false; + } + + /** + * @param w Worker who iterated through all partitions. + */ + void onWorkerDone(DemandWorker w) { + if (isDone()) + return; + + if (remaining.remove(w)) + if (log.isDebugEnabled()) + log.debug("Completed full partition iteration for worker [worker=" + w + ']'); + + if (remaining.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Completed sync future."); + + onDone(); + } + } + } + + /** + * Supply message wrapper. + */ + private static class SupplyMessage<K, V> { + /** Sender ID. */ + private UUID sndId; + + /** Supply message. */ + private GridDhtPartitionSupplyMessage<K, V> supply; + + /** + * Dummy constructor. + */ + private SupplyMessage() { + // No-op. + } + + /** + * @param sndId Sender ID. + * @param supply Supply message. + */ + SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage<K, V> supply) { + this.sndId = sndId; + this.supply = supply; + } + + /** + * @return Sender ID. + */ + UUID senderId() { + return sndId; + } + + /** + * @return Message. + */ + GridDhtPartitionSupplyMessage<K, V> supply() { + return supply; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SupplyMessage.class, this); + } + } + }