http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 0000000,50cf8d9..865a16e mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@@ -1,0 -1,575 +1,567 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.thread.*; + import org.apache.ignite.internal.managers.deployment.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.worker.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.locks.*; + + import static java.util.concurrent.TimeUnit.*; + import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + + /** + * Thread pool for supplying partitions to demanding nodes. + */ + class GridDhtPartitionSupplyPool<K, V> { + /** */ + private final GridCacheContext<K, V> cctx; + + /** */ + private final IgniteLogger log; + + /** */ + private final ReadWriteLock busyLock; + + /** */ + private GridDhtPartitionTopology<K, V> top; + + /** */ + private final Collection<SupplyWorker> workers = new LinkedList<>(); + + /** */ + private final BlockingQueue<DemandMessage<K, V>> queue = new LinkedBlockingDeque<>(); + + /** */ + private final boolean depEnabled; + + /** Preload predicate. */ + private IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred; + + /** + * @param cctx Cache context. + * @param busyLock Shutdown lock. + */ + GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) { + assert cctx != null; + assert busyLock != null; + + this.cctx = cctx; + this.busyLock = busyLock; + + log = cctx.logger(getClass()); + + top = cctx.dht().topology(); + + int poolSize = cctx.preloadEnabled() ? cctx.config().getPreloadThreadPoolSize() : 0; + + for (int i = 0; i < poolSize; i++) + workers.add(new SupplyWorker()); + + cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage<K, V>>() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage<K, V> m) { + processDemandMessage(id, m); + } + }); + + depEnabled = cctx.gridDeploy().enabled(); + } + + /** + * + */ + void start() { + for (SupplyWorker w : workers) + new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start(); + } + + /** + * + */ + void stop() { + U.cancel(workers); + U.join(workers, log); + + top = null; + } + + /** + * Sets preload predicate for supply pool. + * + * @param preloadPred Preload predicate. + */ + void preloadPredicate(IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred) { + this.preloadPred = preloadPred; + } + + /** + * @return Size of this thread pool. + */ + int poolSize() { + return cctx.config().getPreloadThreadPoolSize(); + } + + /** + * @return {@code true} if entered to busy state. + */ + private boolean enterBusy() { + if (busyLock.readLock().tryLock()) + return true; + + if (log.isDebugEnabled()) + log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId()); + + return false; + } + + /** + * @param nodeId Sender node ID. + * @param d Message. + */ + private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage<K, V> d) { + if (!enterBusy()) + return; + + try { + if (cctx.preloadEnabled()) { + if (log.isDebugEnabled()) + log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']'); + + queue.offer(new DemandMessage<>(nodeId, d)); + } + else + U.warn(log, "Received partition demand message when preloading is disabled (will ignore): " + d); + } + finally { + leaveBusy(); + } + } + + /** + * + */ + private void leaveBusy() { + busyLock.readLock().unlock(); + } + + /** + * @param deque Deque to poll from. + * @param w Worker. + * @return Polled item. + * @throws InterruptedException If interrupted. + */ + @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException { + assert w != null; + + // There is currently a case where {@code interrupted} + // flag on a thread gets flipped during stop which causes the pool to hang. This check + // will always make sure that interrupted flag gets reset before going into wait conditions. + // The true fix should actually make sure that interrupted flag does not get reset or that + // interrupted exception gets propagated. Until we find a real fix, this method should + // always work to make sure that there is no hanging during stop. + if (w.isCancelled()) + Thread.currentThread().interrupt(); + + return deque.poll(2000, MILLISECONDS); + } + + /** + * Supply work. + */ + private class SupplyWorker extends GridWorker { + /** Hide worker logger and use cache logger. */ + private IgniteLogger log = GridDhtPartitionSupplyPool.this.log; + + /** + * Default constructor. + */ + private SupplyWorker() { + super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + while (!isCancelled()) { + DemandMessage<K, V> msg = poll(queue, this); + + if (msg == null) + continue; + + ClusterNode node = cctx.discovery().node(msg.senderId()); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Received message from non-existing node (will ignore): " + msg); + + continue; + } + + processMessage(msg, node); + } + } + + /** + * @param msg Message. + * @param node Demander. + */ + private void processMessage(DemandMessage<K, V> msg, ClusterNode node) { + assert msg != null; + assert node != null; + + GridDhtPartitionDemandMessage<K, V> d = msg.message(); + + GridDhtPartitionSupplyMessage<K, V> s = new GridDhtPartitionSupplyMessage<>(d.workerId(), + d.updateSequence(), cctx.cacheId()); + + long preloadThrottle = cctx.config().getPreloadThrottle(); + + boolean ack = false; + + // If demander node left grid. + boolean nodeLeft = false; + + boolean convertPortable = cctx.portableEnabled() && cctx.offheapTiered(); + + try { + // Partition map exchange is finished which means that all near transactions with given + // topology version are committed. We can wait for local locks here as it will not take + // much time. + cctx.mvcc().finishLocks(d.topologyVersion()).get(); + + for (Integer part : d.partitions()) { + GridDhtLocalPartition<K, V> loc = top.localPartition(part, d.topologyVersion(), false); + + if (loc == null || loc.state() != OWNING || !loc.reserve()) { + // Reply with partition of "-1" to let sender know that + // this node is no longer an owner. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Requested partition is not owned by local node [part=" + part + + ", demander=" + msg.senderId() + ']'); + + continue; + } + + GridCacheEntryInfoCollectSwapListener<K, V> swapLsnr = null; + + try { + if (cctx.isSwapOrOffheapEnabled()) { + swapLsnr = new GridCacheEntryInfoCollectSwapListener<>(log, cctx); + + cctx.swap().addOffHeapListener(part, swapLsnr); + cctx.swap().addSwapListener(part, swapLsnr); + } + + boolean partMissing = false; + + for (GridCacheEntryEx<K, V> e : loc.entries()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition [part=" + part + + ", nodeId=" + msg.senderId() + ']'); + + partMissing = true; + + break; + } + + if (s.messageSize() >= cctx.config().getPreloadBatchSize()) { + ack = true; + + if (!reply(node, d, s)) { + nodeLeft = true; + + return; + } + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + s = new GridDhtPartitionSupplyMessage<>(d.workerId(), d.updateSequence(), + cctx.cacheId()); + } + + GridCacheEntryInfo<K, V> info = e.info(); + + if (info != null && !(info.key() instanceof GridPartitionLockKey) && !info.isNew()) { + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx.shared()); + else if (log.isDebugEnabled()) + log.debug("Preload predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + if (partMissing) + continue; + + if (cctx.isSwapOrOffheapEnabled()) { + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> iter = + cctx.swap().iterator(part, false); + + // Iterator may be null if space does not exist. + if (iter != null) { + try { + boolean prepared = false; + + for (Map.Entry<byte[], GridCacheSwapEntry<V>> e : iter) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + msg.senderId() + ']'); + + partMissing = true; + + break; // For. + } + + if (s.messageSize() >= cctx.config().getPreloadBatchSize()) { + ack = true; + + if (!reply(node, d, s)) { + nodeLeft = true; + + return; + } + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + s = new GridDhtPartitionSupplyMessage<>(d.workerId(), + d.updateSequence(), cctx.cacheId()); + } + + GridCacheSwapEntry<V> swapEntry = e.getValue(); + + GridCacheEntryInfo<K, V> info = new GridCacheEntryInfo<>(); + + info.keyBytes(e.getKey()); + info.ttl(swapEntry.ttl()); + info.expireTime(swapEntry.expireTime()); + info.version(swapEntry.version()); + + if (!swapEntry.valueIsByteArray()) { + if (convertPortable) + info.valueBytes(cctx.convertPortableBytes(swapEntry.valueBytes())); + else + info.valueBytes(swapEntry.valueBytes()); + } + else + info.value(swapEntry.value()); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry0(part, info, cctx.shared()); + else { + if (log.isDebugEnabled()) + log.debug("Preload predicate evaluated to false (will not send " + + "cache entry): " + info); + + continue; + } + + // Need to manually prepare cache message. + if (depEnabled && !prepared) { + ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : + swapEntry.valueClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : + null; + + if (ldr == null) + continue; + + if (ldr instanceof GridDeploymentInfo) { + s.prepare((GridDeploymentInfo)ldr); + + prepared = true; + } + } + } + + if (partMissing) + continue; + } + finally { + iter.close(); + } + } + } + + // Stop receiving promote notifications. + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + + if (swapLsnr != null) { + Collection<GridCacheEntryInfo<K, V>> entries = swapLsnr.entries(); + + swapLsnr = null; + + for (GridCacheEntryInfo<K, V> info : entries) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + msg.senderId() + ']'); + + // No need to continue iteration over swap entries. + break; + } + + if (s.messageSize() >= cctx.config().getPreloadBatchSize()) { + ack = true; + + if (!reply(node, d, s)) { + nodeLeft = true; + + return; + } + + s = new GridDhtPartitionSupplyMessage<>(d.workerId(), d.updateSequence(), + cctx.cacheId()); + } + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx.shared()); + else if (log.isDebugEnabled()) + log.debug("Preload predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + // Mark as last supply message. + s.last(part); + + if (ack) { + s.markAck(); + + break; // Partition for loop. + } + } + finally { + loc.release(); + + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + } + } + + reply(node, d, s); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partition supply message to node: " + node.id(), e); - - // Removing current topic because of request must fail with timeout and - // demander will generate new topic. - cctx.io().removeMessageId(d.topic()); - } - finally { - if (!ack || nodeLeft) - cctx.io().removeMessageId(d.topic()); + } + } + + /** + * @param n Node. + * @param d Demand message. + * @param s Supply message. + * @return {@code True} if message was sent, {@code false} if recipient left grid. + * @throws IgniteCheckedException If failed. + */ + private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage<K, V> d, GridDhtPartitionSupplyMessage<K, V> s) + throws IgniteCheckedException { + try { + if (log.isDebugEnabled()) + log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + - cctx.io().sendOrderedMessage(n, d.topic(), cctx.io().messageId(d.topic(), n.id()), s, d.timeout()); ++ cctx.io().sendOrderedMessage(n, d.topic(), s, d.timeout()); + + return true; + } + catch (ClusterTopologyException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition supply message because node left grid: " + n.id()); + + return false; + } + } + } + + /** + * Demand message wrapper. + */ + private static class DemandMessage<K, V> extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage<K, V>> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param sndId Sender ID. + * @param msg Message. + */ + DemandMessage(UUID sndId, GridDhtPartitionDemandMessage<K, V> msg) { + super(sndId, msg); + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public DemandMessage() { + // No-op. + } + + /** + * @return Sender ID. + */ + UUID senderId() { + return get1(); + } + + /** + * @return Message. + */ + public GridDhtPartitionDemandMessage<K, V> message() { + return get2(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']'; + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 0000000,e7cd7f3..5be7d9f mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@@ -1,0 -1,793 +1,770 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.query; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.cache.query.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.managers.eventstorage.*; + import org.apache.ignite.internal.processors.query.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jdk8.backport.*; + import org.jetbrains.annotations.*; + + import java.util.*; + import java.util.concurrent.*; + + import static org.apache.ignite.events.IgniteEventType.*; + import static org.apache.ignite.cache.CacheMode.*; + import static org.apache.ignite.internal.GridTopic.*; + + /** + * Distributed query manager. + */ + public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManager<K, V> { + /** */ + private static final int MAX_CANCEL_IDS = 1000; + + /** Query response frequency. */ + private static final long RESEND_FREQ = 3000; + + /** Query response attempts. */ + private static final int RESEND_ATTEMPTS = 5; + + /** Prefix for communication topic. */ + private static final String TOPIC_PREFIX = "QUERY"; + + /** {request ID -> thread} */ + private ConcurrentMap<Long, Thread> threads = new ConcurrentHashMap8<>(); + + /** {request ID -> future} */ + private ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, ?>> futs = + new ConcurrentHashMap8<>(); + + /** Received requests to cancel. */ + private Collection<CancelMessageId> cancelIds = + new GridBoundedConcurrentOrderedSet<>(MAX_CANCEL_IDS); + + /** Canceled queries. */ + private Collection<Long> cancelled = new GridBoundedConcurrentOrderedSet<>(MAX_CANCEL_IDS); + + /** Query response handler. */ + private IgniteBiInClosure<UUID,GridCacheQueryResponse<K,V>> resHnd = new CI2<UUID, GridCacheQueryResponse<K, V>>() { + @Override public void apply(UUID nodeId, GridCacheQueryResponse<K, V> res) { + processQueryResponse(nodeId, res); + } + }; + + /** {@inheritDoc} */ + @Override public void start0() throws IgniteCheckedException { + super.start0(); + + assert cctx.config().getCacheMode() != LOCAL; + + cctx.io().addHandler(cctx.cacheId(), GridCacheQueryRequest.class, new CI2<UUID, GridCacheQueryRequest<K, V>>() { + @Override public void apply(UUID nodeId, GridCacheQueryRequest<K, V> req) { + processQueryRequest(nodeId, req); + } + }); + + cctx.events().addListener(new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + for (GridCacheDistributedQueryFuture fut : futs.values()) + fut.onNodeLeft(discoEvt.eventNode().id()); + } + }, EVT_NODE_LEFT, EVT_NODE_FAILED); + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + super.printMemoryStats(); + + X.println(">>> threadsSize: " + threads.size()); + X.println(">>> futsSize: " + futs.size()); + } + + /** + * Removes query future from futures map. + * + * @param reqId Request id. + * @param fut Query future. + */ + protected void addQueryFuture(long reqId, GridCacheDistributedQueryFuture<?, ?, ?> fut) { + futs.put(reqId, fut); + } + + /** + * Removes query future from futures map. + * + * @param reqId Request id. + */ + protected void removeQueryFuture(long reqId) { + futs.remove(reqId); + } + + /** + * Gets query future from futures map. + * + * @param reqId Request id. + * @return Found future or null. + */ + protected GridCacheDistributedQueryFuture<?, ?, ?> getQueryFuture(long reqId) { + return futs.get(reqId); + } + + /** + * Processes cache query request. + * + * @param sndId Sender node id. + * @param req Query request. + */ + @SuppressWarnings("unchecked") + @Override void processQueryRequest(UUID sndId, GridCacheQueryRequest req) { + if (req.cancel()) { + cancelIds.add(new CancelMessageId(req.id(), sndId)); + + if (req.fields()) + removeFieldsQueryResult(sndId, req.id()); + else + removeQueryResult(sndId, req.id()); + } + else { + if (!cancelIds.contains(new CancelMessageId(req.id(), sndId))) { + if (!F.eq(req.cacheName(), cctx.name())) { + GridCacheQueryResponse res = new GridCacheQueryResponse( + cctx.cacheId(), + req.id(), + new IgniteCheckedException("Received request for incorrect cache [expected=" + cctx.name() + + ", actual=" + req.cacheName())); + + sendQueryResponse(sndId, res, 0); + } + else { + threads.put(req.id(), Thread.currentThread()); + + try { + GridCacheQueryInfo info = distributedQueryInfo(sndId, req); + + if (info == null) + return; + + if (req.fields()) + runFieldsQuery(info); + else + runQuery(info); + } + catch (Throwable e) { + U.error(log(), "Failed to run query.", e); + + sendQueryResponse(sndId, new GridCacheQueryResponse(cctx.cacheId(), req.id(), e.getCause()), 0); + } + finally { + threads.remove(req.id()); + } + } + } + } + } + + /** + * @param sndId Sender node id. + * @param req Query request. + * @return Query info. + * @throws ClassNotFoundException If class not found. + */ + @Nullable private GridCacheQueryInfo distributedQueryInfo(UUID sndId, GridCacheQueryRequest<K, V> req) + throws ClassNotFoundException { + IgnitePredicate<CacheEntry<Object, Object>> prjPred = req.projectionFilter() == null ? + F.<CacheEntry<Object, Object>>alwaysTrue() : req.projectionFilter(); + + IgniteReducer<Object, Object> rdc = req.reducer(); + IgniteClosure<Object, Object> trans = req.transformer(); + + ClusterNode sndNode = cctx.node(sndId); + + if (sndNode == null) + return null; + + GridCacheQueryAdapter<?> qry = + new GridCacheQueryAdapter<>( + cctx, + prjPred, + req.type(), + log, + req.pageSize(), + 0, + false, + req.includeBackups(), + false, + null, + req.keyValueFilter(), + req.className(), + req.clause(), + req.includeMetaData(), + req.keepPortable(), + req.subjectId(), + req.taskHash() + ); + + return new GridCacheQueryInfo( + false, + prjPred, + trans, + rdc, + qry, + null, + sndId, + req.id(), + req.includeMetaData(), + req.allPages(), + req.arguments() + ); + } + + /** + * Sends cache query response. + * + * @param nodeId Node to send response. + * @param res Cache query response. + * @param timeout Message timeout. + * @return {@code true} if response was sent, {@code false} otherwise. + */ + private boolean sendQueryResponse(UUID nodeId, GridCacheQueryResponse<K, V> res, long timeout) { + ClusterNode node = cctx.node(nodeId); + + if (node == null) + return false; + + int attempt = 1; + + IgniteCheckedException err = null; + + while (!Thread.currentThread().isInterrupted()) { + try { + if (log.isDebugEnabled()) + log.debug("Send query response: " + res); + + Object topic = topic(nodeId, res.requestId()); + + cctx.io().sendOrderedMessage( + node, + topic, - cctx.io().messageId(topic, nodeId), + res, + timeout > 0 ? timeout : Long.MAX_VALUE); + + return true; + } + catch (ClusterTopologyException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to send query response since node left grid [nodeId=" + nodeId + + ", res=" + res + "]"); + + return false; + } + catch (IgniteCheckedException e) { + if (err == null) + err = e; + + if (Thread.currentThread().isInterrupted()) + break; + + if (attempt < RESEND_ATTEMPTS) { + if (log.isDebugEnabled()) + log.debug("Failed to send queries response (will try again) [nodeId=" + nodeId + ", res=" + + res + ", attempt=" + attempt + ", err=" + e + "]"); + + if (!Thread.currentThread().isInterrupted()) + try { + U.sleep(RESEND_FREQ); + } + catch (IgniteInterruptedException e1) { + U.error(log, + "Waiting for queries response resending was interrupted (response will not be sent) " + + "[nodeId=" + nodeId + ", response=" + res + "]", e1); + + return false; + } + } + else { + U.error(log, "Failed to sender cache response [nodeId=" + nodeId + ", response=" + res + "]", err); + + return false; + } + } + + attempt++; + } + + return false; + } + - /** {@inheritDoc} */ - @Override protected void removeQueryResult(@Nullable UUID sndId, long reqId) { - super.removeQueryResult(sndId, reqId); - - if (sndId != null) { - Object topic = topic(sndId, reqId); - - cctx.io().removeMessageId(topic); - } - } - - /** {@inheritDoc} */ - @Override protected void removeFieldsQueryResult(@Nullable UUID sndId, long reqId) { - super.removeFieldsQueryResult(sndId, reqId); - - if (sndId != null) { - Object topic = topic(sndId, reqId); - - cctx.io().removeMessageId(topic); - } - } - + /** + * Processes cache query response. + * + * @param sndId Sender node id. + * @param res Query response. + */ + @SuppressWarnings("unchecked") + private void processQueryResponse(UUID sndId, GridCacheQueryResponse res) { + if (log.isDebugEnabled()) + log.debug("Received query response: " + res); + + GridCacheQueryFutureAdapter fut = getQueryFuture(res.requestId()); + + if (fut != null) + if (res.fields()) + ((GridCacheDistributedFieldsQueryFuture)fut).onPage( + sndId, res.metadata(), res.data(), res.error(), res.isFinished()); + else + fut.onPage(sndId, res.data(), res.error(), res.isFinished()); + else if (!cancelled.contains(res.requestId())) + U.warn(log, "Received response for finished or unknown query [rmtNodeId=" + sndId + + ", res=" + res + ']'); + } + + /** {@inheritDoc} */ + @Override void onQueryFutureCanceled(long reqId) { + cancelled.add(reqId); + } + + /** {@inheritDoc} */ + @Override void onCancelAtStop() { + super.onCancelAtStop(); + + for (GridCacheQueryFutureAdapter fut : futs.values()) + try { + fut.cancel(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to cancel running query future: " + fut, e); + } + + U.interrupt(threads.values()); + } + + /** {@inheritDoc} */ + @Override void onWaitAtStop() { + super.onWaitAtStop(); + + // Wait till all requests will be finished. + for (GridCacheQueryFutureAdapter fut : futs.values()) + try { + fut.get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Received query error while waiting for query to finish [queryFuture= " + fut + + ", error= " + e + ']'); + } + } + + /** {@inheritDoc} */ + @Override protected boolean onPageReady(boolean loc, GridCacheQueryInfo qryInfo, + Collection<?> data, boolean finished, Throwable e) { + GridCacheLocalQueryFuture<?, ?, ?> fut = qryInfo.localQueryFuture(); + + if (loc) + assert fut != null; + + if (e != null) { + if (loc) + fut.onPage(null, null, e, true); + else + sendQueryResponse(qryInfo.senderId(), + new GridCacheQueryResponse<K, V>(cctx.cacheId(), qryInfo.requestId(), e), + qryInfo.query().timeout()); + + return true; + } + + if (loc) + fut.onPage(null, data, null, finished); + else { + GridCacheQueryResponse<K, V> res = new GridCacheQueryResponse<>(cctx.cacheId(), qryInfo.requestId(), + /*finished*/false, /*fields*/false); + + res.data(data); + res.finished(finished); + + if (!sendQueryResponse(qryInfo.senderId(), res, qryInfo.query().timeout())) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override protected boolean onFieldsPageReady(boolean loc, GridCacheQueryInfo qryInfo, + @Nullable List<GridQueryFieldMetadata> metadata, + @Nullable Collection<?> entities, + @Nullable Collection<?> data, + boolean finished, @Nullable Throwable e) { + assert qryInfo != null; + + if (e != null) { + if (loc) { + GridCacheLocalFieldsQueryFuture fut = (GridCacheLocalFieldsQueryFuture)qryInfo.localQueryFuture(); + + fut.onPage(null, null, null, e, true); + } + else + sendQueryResponse(qryInfo.senderId(), + new GridCacheQueryResponse<K, V>(cctx.cacheId(), qryInfo.requestId(), e), + qryInfo.query().timeout()); + + return true; + } + + if (loc) { + GridCacheLocalFieldsQueryFuture fut = (GridCacheLocalFieldsQueryFuture)qryInfo.localQueryFuture(); + + fut.onPage(null, metadata, data, null, finished); + } + else { + GridCacheQueryResponse<K, V> res = new GridCacheQueryResponse<>(cctx.cacheId(), qryInfo.requestId(), + finished, qryInfo.reducer() == null); + + res.metadata(metadata); + res.data(entities != null ? entities : data); + + if (!sendQueryResponse(qryInfo.senderId(), res, qryInfo.query().timeout())) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry) { + assert cctx.config().getCacheMode() != LOCAL; + + if (log.isDebugEnabled()) + log.debug("Executing query on local node: " + qry); + + GridCacheLocalQueryFuture<K, V, ?> fut = new GridCacheLocalQueryFuture<>(cctx, qry); + + try { + qry.query().validate(); + + fut.execute(); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + + return fut; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) { + assert cctx.config().getCacheMode() != LOCAL; + + if (log.isDebugEnabled()) + log.debug("Executing distributed query: " + qry); + + long reqId = cctx.io().nextIoId(); + + final GridCacheDistributedQueryFuture<K, V, ?> fut = + new GridCacheDistributedQueryFuture<>(cctx, reqId, qry, nodes); + + try { + qry.query().validate(); + + String clsName = qry.query().queryClassName(); + + GridCacheQueryRequest req = new GridCacheQueryRequest( + cctx.cacheId(), + reqId, + cctx.name(), + qry.query().type(), + false, + qry.query().clause(), + clsName, + qry.query().scanFilter(), + qry.query().projectionFilter(), + qry.reducer(), + qry.transform(), + qry.query().pageSize(), + qry.query().includeBackups(), + qry.arguments(), + false, + qry.query().keepPortable(), + qry.query().subjectId(), + qry.query().taskHash()); + + addQueryFuture(req.id(), fut); + + final Object topic = topic(cctx.nodeId(), req.id()); + + cctx.io().addOrderedHandler(topic, resHnd); + + fut.listenAsync(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> fut) { + cctx.io().removeOrderedHandler(topic); + } + }); + + sendRequest(fut, req, nodes); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + + return fut; + } + + /** {@inheritDoc} */ + @Override public void loadPage(long id, GridCacheQueryAdapter<?> qry, Collection<ClusterNode> nodes, boolean all) { + assert cctx.config().getCacheMode() != LOCAL; + assert qry != null; + assert nodes != null; + + GridCacheDistributedQueryFuture<?, ?, ?> fut = futs.get(id); + + assert fut != null; + + try { + GridCacheQueryRequest<K, V> req = new GridCacheQueryRequest<>( + cctx.cacheId(), + id, + cctx.name(), + qry.pageSize(), + qry.includeBackups(), + fut.fields(), + all, + qry.keepPortable(), + qry.subjectId(), + qry.taskHash()); + + sendRequest(fut, req, nodes); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + } + + /** {@inheritDoc} */ + @Override public CacheQueryFuture<?> queryFieldsLocal(GridCacheQueryBean qry) { + assert cctx.config().getCacheMode() != LOCAL; + + if (log.isDebugEnabled()) + log.debug("Executing query on local node: " + qry); + + GridCacheLocalFieldsQueryFuture fut = new GridCacheLocalFieldsQueryFuture(cctx, qry); + + try { + qry.query().validate(); + + fut.execute(); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + + return fut; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry, + Collection<ClusterNode> nodes) { + assert cctx.config().getCacheMode() != LOCAL; + + if (log.isDebugEnabled()) + log.debug("Executing distributed query: " + qry); + + long reqId = cctx.io().nextIoId(); + + final GridCacheDistributedFieldsQueryFuture fut = + new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes); + + try { + qry.query().validate(); + + GridCacheQueryRequest req = new GridCacheQueryRequest( + cctx.cacheId(), + reqId, + cctx.name(), + qry.query().type(), + true, + qry.query().clause(), + null, + null, + qry.query().projectionFilter(), + qry.reducer(), + qry.transform(), + qry.query().pageSize(), + qry.query().includeBackups(), + qry.arguments(), + qry.query().includeMetadata(), + qry.query().keepPortable(), + qry.query().subjectId(), + qry.query().taskHash()); + + addQueryFuture(req.id(), fut); + + final Object topic = topic(cctx.nodeId(), req.id()); + + cctx.io().addOrderedHandler(topic, resHnd); + + fut.listenAsync(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> fut) { + cctx.io().removeOrderedHandler(topic); + } + }); + + sendRequest(fut, req, nodes); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + + return fut; + } + + /** + * Sends query request. + * + * @param fut Distributed future. + * @param req Request. + * @param nodes Nodes. + * @throws IgniteCheckedException In case of error. + * @deprecated Need to remove nodes filtration after breaking compatibility. + */ + @Deprecated + @SuppressWarnings("unchecked") + private void sendRequest( + final GridCacheDistributedQueryFuture<?, ?, ?> fut, + final GridCacheQueryRequest<K, V> req, + Collection<ClusterNode> nodes + ) throws IgniteCheckedException { + assert fut != null; + assert req != null; + assert nodes != null; + + final UUID locNodeId = cctx.localNodeId(); + + ClusterNode locNode = null; + + Collection<ClusterNode> rmtNodes = null; + + for (ClusterNode n : nodes) { + if (n.id().equals(locNodeId)) + locNode = n; + else { + if (rmtNodes == null) + rmtNodes = new ArrayList<>(nodes.size()); + + rmtNodes.add(n); + } + } + + // Request should be sent to remote nodes before the query is processed on the local node. + // For example, a remote reducer has a state, we should not serialize and then send + // the reducer changed by the local node. + if (!F.isEmpty(rmtNodes)) { + cctx.io().safeSend(rmtNodes, req, new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + fut.onNodeLeft(node.id()); + + return !fut.isDone(); + } + }); + } + + if (locNode != null) { + cctx.closures().callLocalSafe(new Callable<Object>() { + @Override public Object call() throws Exception { + req.beforeLocalExecution(cctx); + + processQueryRequest(locNodeId, req); + + return null; + } + }); + } + } + + /** + * Gets topic for ordered response messages. + * + * @param nodeId Node ID. + * @param reqId Request ID. + * @return Topic. + */ + private Object topic(UUID nodeId, long reqId) { + return TOPIC_CACHE.topic(TOPIC_PREFIX, nodeId, reqId); + } + + /** + * Cancel message ID. + */ + private static class CancelMessageId implements Comparable<CancelMessageId> { + /** Message ID. */ + private long reqId; + + /** Node ID. */ + private UUID nodeId; + + /** + * @param reqId Message ID. + * @param nodeId Node ID. + */ + private CancelMessageId(long reqId, UUID nodeId) { + this.reqId = reqId; + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override public int compareTo(CancelMessageId m) { + if (m.reqId == reqId) + return m.nodeId.compareTo(nodeId); + + return reqId < m.reqId ? -1 : 1; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + CancelMessageId other = (CancelMessageId)obj; + + return reqId == other.reqId && nodeId.equals(other.nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * ((int)(reqId ^ (reqId >>> 32))) + nodeId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CancelMessageId.class, this); + } + } + }