Repository: incubator-ignite Updated Branches: refs/heads/ignite-301 c91fd9dcf -> 8d50c599d
# IGNITE-301 Rename IgniteClusterImpl to ClusterProcessor. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8d50c599 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8d50c599 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8d50c599 Branch: refs/heads/ignite-301 Commit: 8d50c599dc32fadb1c546c8667d35275f6f9b0fd Parents: c91fd9d Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Sun Mar 1 20:23:22 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Sun Mar 1 20:23:22 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 2 +- .../ignite/internal/GridKernalContextImpl.java | 8 +- .../apache/ignite/internal/IgniteKernal.java | 2 +- .../internal/cluster/ClusterProcessor.java | 579 +++++++++++++++++++ .../cluster/IgniteClusterAsyncImpl.java | 6 +- .../internal/cluster/IgniteClusterImpl.java | 579 ------------------- 6 files changed, 588 insertions(+), 588 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index c342e59..5935658 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -515,5 +515,5 @@ public interface GridKernalContext extends Iterable<GridComponent> { * * @return Cluster processor. */ - public IgniteClusterImpl cluster(); + public ClusterProcessor cluster(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index e8c3e4f..bb85818 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -246,7 +246,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude - private IgniteClusterImpl cluster; + private ClusterProcessor cluster; /** */ @GridToStringExclude @@ -466,8 +466,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable qryProc = (GridQueryProcessor)comp; else if (comp instanceof DataStructuresProcessor) dataStructuresProc = (DataStructuresProcessor)comp; - else if (comp instanceof IgniteClusterImpl) - cluster = (IgniteClusterImpl)comp; + else if (comp instanceof ClusterProcessor) + cluster = (ClusterProcessor)comp; else assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass(); @@ -861,7 +861,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public IgniteClusterImpl cluster() { + @Override public ClusterProcessor cluster() { return cluster; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index e3e0b0a..ee0428d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -674,7 +674,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { igfsExecSvc, restExecSvc); - startProcessor(ctx, new IgniteClusterImpl(ctx), attrs); + startProcessor(ctx, new ClusterProcessor(ctx), attrs); U.onGridStart(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java new file mode 100644 index 0000000..413b108 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.nodestart.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.IgniteNodeAttributes.*; +import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*; + +/** + * + */ +public class ClusterProcessor extends ClusterGroupAdapter implements IgniteClusterEx, GridProcessor { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteConfiguration cfg; + + /** Node local store. */ + @GridToStringExclude + private ConcurrentMap nodeLoc; + + /** + * Required by {@link Externalizable}. + */ + public ClusterProcessor() { + // No-op. + } + + /** + * @param ctx Kernal context. + */ + public ClusterProcessor(GridKernalContext ctx) { + super(ctx, null, (IgnitePredicate<ClusterNode>)null); + + cfg = ctx.config(); + + nodeLoc = new ClusterNodeLocalMapImpl(ctx); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forLocal() { + guard(); + + try { + return new ClusterGroupAdapter(ctx, null, Collections.singleton(cfg.getNodeId())); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + guard(); + + try { + ClusterNode node = ctx.discovery().localNode(); + + assert node != null; + + return node; + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <K, V> ConcurrentMap<K, V> nodeLocalMap() { + guard(); + + try { + return nodeLoc; + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + A.notNull(nodeId, "nodeId"); + + guard(); + + try { + return ctx.discovery().pingNode(nodeId); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public long topologyVersion() { + guard(); + + try { + return ctx.discovery().topologyVersion(); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> topology(long topVer) throws UnsupportedOperationException { + guard(); + + try { + return ctx.discovery().topology(topVer); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName, + @Nullable Collection<? extends K> keys) + throws IgniteException + { + if (F.isEmpty(keys)) + return Collections.emptyMap(); + + guard(); + + try { + return ctx.affinity().mapKeysToNodes(cacheName, keys); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteException { + A.notNull(key, "key"); + + guard(); + + try { + return ctx.affinity().mapKeyToNode(cacheName, key); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file, + boolean restart, + int timeout, + int maxConn) + throws IgniteException + { + try { + return startNodesAsync(file, restart, timeout, maxConn).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(Collection<Map<String, Object>> hosts, + @Nullable Map<String, Object> dflts, + boolean restart, + int timeout, + int maxConn) + throws IgniteException + { + try { + return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void stopNodes() throws IgniteException { + guard(); + + try { + compute().execute(IgniteKillTask.class, false); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void stopNodes(Collection<UUID> ids) throws IgniteException { + guard(); + + try { + ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, false); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void restartNodes() throws IgniteException { + guard(); + + try { + compute().execute(IgniteKillTask.class, true); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void restartNodes(Collection<UUID> ids) throws IgniteException { + guard(); + + try { + ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, true); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + guard(); + + try { + ctx.jobMetric().reset(); + ctx.io().resetMetrics(); + ctx.task().resetMetrics(); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public IgniteCluster withAsync() { + return new IgniteClusterAsyncImpl(this); + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return false; + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + throw new IllegalStateException("Asynchronous mode is not enabled."); + } + + /** + * @param file Configuration file. + * @param restart Whether to stop existing nodes. + * @param timeout Connection timeout. + * @param maxConn Number of parallel SSH connections to one host. + * @return Future with results. + * @see {@link IgniteCluster#startNodes(java.io.File, boolean, int, int)}. + */ + IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file, + boolean restart, + int timeout, + int maxConn) + { + A.notNull(file, "file"); + A.ensure(file.exists(), "file doesn't exist."); + A.ensure(file.isFile(), "file is a directory."); + + try { + IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file); + + return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx, e); + } + } + + /** + * @param hosts Startup parameters. + * @param dflts Default values. + * @param restart Whether to stop existing nodes + * @param timeout Connection timeout in milliseconds. + * @param maxConn Number of parallel SSH connections to one host. + * @return Future with results. + * @see {@link IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}. + */ + IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync( + Collection<Map<String, Object>> hosts, + @Nullable Map<String, Object> dflts, + boolean restart, + int timeout, + int maxConn) + { + A.notNull(hosts, "hosts"); + + guard(); + + try { + IgniteSshProcessor sshProcessor = IgniteComponentType.SSH.create(false); + + Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts); + + Map<String, ConcurrentLinkedQueue<IgniteNodeCallable>> runMap = new HashMap<>(); + + int nodeCallCnt = 0; + + for (String host : specsMap.keySet()) { + InetAddress addr; + + try { + addr = InetAddress.getByName(host); + } + catch (UnknownHostException e) { + throw new IgniteCheckedException("Invalid host name: " + host, e); + } + + Collection<? extends ClusterNode> neighbors = null; + + if (addr.isLoopbackAddress()) + neighbors = neighbors(); + else { + for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) { + ClusterNode node = F.first(p); + + if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) { + neighbors = p; + + break; + } + } + } + + int startIdx = 1; + + if (neighbors != null) { + if (restart && !neighbors.isEmpty()) { + try { + ctx.grid().compute(forNodes(neighbors)).execute(IgniteKillTask.class, false); + } + catch (ClusterGroupEmptyException ignored) { + // No-op, nothing to restart. + } + } + else + startIdx = neighbors.size() + 1; + } + + ConcurrentLinkedQueue<IgniteNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>(); + + runMap.put(host, nodeRuns); + + for (IgniteRemoteStartSpecification spec : specsMap.get(host)) { + assert spec.host().equals(host); + + for (int i = startIdx; i <= spec.nodes(); i++) { + nodeRuns.add(sshProcessor.nodeStartCallable(spec, timeout)); + + nodeCallCnt++; + } + } + } + + // If there is nothing to start, return finished future with empty result. + if (nodeCallCnt == 0) + return new GridFinishedFuture<Collection<GridTuple3<String, Boolean, String>>>( + ctx, Collections.<GridTuple3<String, Boolean, String>>emptyList()); + + // Exceeding max line width for readability. + GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>> + fut = new GridCompoundFuture<>( + ctx, + CU.<GridTuple3<String, Boolean, String>>objectsReducer() + ); + + AtomicInteger cnt = new AtomicInteger(nodeCallCnt); + + // Limit maximum simultaneous connection number per host. + for (ConcurrentLinkedQueue<IgniteNodeCallable> queue : runMap.values()) { + for (int i = 0; i < maxConn; i++) { + if (!runNextNodeCallable(queue, fut, cnt)) + break; + } + } + + return fut; + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx, e); + } + finally { + unguard(); + } + } + + /** + * Gets the all grid nodes that reside on the same physical computer as local grid node. + * Local grid node is excluded. + * <p> + * Detection of the same physical computer is based on comparing set of network interface MACs. + * If two nodes have the same set of MACs, Ignite considers these nodes running on the same + * physical computer. + * @return Grid nodes that reside on the same physical computer as local grid node. + */ + private Collection<ClusterNode> neighbors() { + Collection<ClusterNode> neighbors = new ArrayList<>(1); + + String macs = localNode().attribute(ATTR_MACS); + + assert macs != null; + + for (ClusterNode n : forOthers(localNode()).nodes()) { + if (macs.equals(n.attribute(ATTR_MACS))) + neighbors.add(n); + } + + return neighbors; + } + + /** + * Runs next callable from host node start queue. + * + * @param queue Queue of tasks to poll from. + * @param comp Compound future that comprise all started node tasks. + * @param cnt Atomic counter to check if all futures are added to compound future. + * @return {@code True} if task was started, {@code false} if queue was empty. + */ + private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallable> queue, + final GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>> + comp, + final AtomicInteger cnt) + { + IgniteNodeCallable call = queue.poll(); + + if (call == null) + return false; + + IgniteInternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true); + + comp.add(fut); + + if (cnt.decrementAndGet() == 0) + comp.markInitialized(); + + fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() { + @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) { + runNextNodeCallable(queue, comp, cnt); + } + }); + + return true; + } + + /** + * Clears node local map. + */ + public void clearNodeMap() { + nodeLoc.clear(); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ctx = (GridKernalContext)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx); + } + + /** {@inheritDoc} */ + @Override protected Object readResolve() throws ObjectStreamException { + return ctx.grid().cluster(); + } + + /** {@inheritDoc} */ + @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Object collectDiscoveryData(UUID nodeId) { + return null; + } + + /** {@inheritDoc} */ + @Override public void onDiscoveryDataReceived(UUID nodeId, Object data) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { + return null; + } + + /** {@inheritDoc} */ + public String toString() { + return "IgniteCluster [igniteName=" + ctx.gridName() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java index 960bacd..43a6435 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java @@ -38,7 +38,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> private static final long serialVersionUID = 0L; /** */ - private IgniteClusterImpl cluster; + private ClusterProcessor cluster; /** * Required by {@link Externalizable}. @@ -50,7 +50,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> /** * @param cluster Cluster. */ - public IgniteClusterAsyncImpl(IgniteClusterImpl cluster) { + public IgniteClusterAsyncImpl(ClusterProcessor cluster) { super(true); this.cluster = cluster; @@ -274,7 +274,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cluster = (IgniteClusterImpl)in.readObject(); + cluster = (ClusterProcessor)in.readObject(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java deleted file mode 100644 index 45462a4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java +++ /dev/null @@ -1,579 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.cluster; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.nodestart.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.IgniteNodeAttributes.*; -import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*; - -/** - * - */ -public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClusterEx, GridProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private IgniteConfiguration cfg; - - /** Node local store. */ - @GridToStringExclude - private ConcurrentMap nodeLoc; - - /** - * Required by {@link Externalizable}. - */ - public IgniteClusterImpl() { - // No-op. - } - - /** - * @param ctx Kernal context. - */ - public IgniteClusterImpl(GridKernalContext ctx) { - super(ctx, null, (IgnitePredicate<ClusterNode>)null); - - cfg = ctx.config(); - - nodeLoc = new ClusterNodeLocalMapImpl(ctx); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forLocal() { - guard(); - - try { - return new ClusterGroupAdapter(ctx, null, Collections.singleton(cfg.getNodeId())); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode localNode() { - guard(); - - try { - ClusterNode node = ctx.discovery().localNode(); - - assert node != null; - - return node; - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public <K, V> ConcurrentMap<K, V> nodeLocalMap() { - guard(); - - try { - return nodeLoc; - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public boolean pingNode(UUID nodeId) { - A.notNull(nodeId, "nodeId"); - - guard(); - - try { - return ctx.discovery().pingNode(nodeId); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public long topologyVersion() { - guard(); - - try { - return ctx.discovery().topologyVersion(); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> topology(long topVer) throws UnsupportedOperationException { - guard(); - - try { - return ctx.discovery().topology(topVer); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName, - @Nullable Collection<? extends K> keys) - throws IgniteException - { - if (F.isEmpty(keys)) - return Collections.emptyMap(); - - guard(); - - try { - return ctx.affinity().mapKeysToNodes(cacheName, keys); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteException { - A.notNull(key, "key"); - - guard(); - - try { - return ctx.affinity().mapKeyToNode(cacheName, key); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file, - boolean restart, - int timeout, - int maxConn) - throws IgniteException - { - try { - return startNodesAsync(file, restart, timeout, maxConn).get(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(Collection<Map<String, Object>> hosts, - @Nullable Map<String, Object> dflts, - boolean restart, - int timeout, - int maxConn) - throws IgniteException - { - try { - return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public void stopNodes() throws IgniteException { - guard(); - - try { - compute().execute(IgniteKillTask.class, false); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public void stopNodes(Collection<UUID> ids) throws IgniteException { - guard(); - - try { - ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, false); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public void restartNodes() throws IgniteException { - guard(); - - try { - compute().execute(IgniteKillTask.class, true); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public void restartNodes(Collection<UUID> ids) throws IgniteException { - guard(); - - try { - ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, true); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() { - guard(); - - try { - ctx.jobMetric().reset(); - ctx.io().resetMetrics(); - ctx.task().resetMetrics(); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public IgniteCluster withAsync() { - return new IgniteClusterAsyncImpl(this); - } - - /** {@inheritDoc} */ - @Override public boolean isAsync() { - return false; - } - - /** {@inheritDoc} */ - @Override public <R> IgniteFuture<R> future() { - throw new IllegalStateException("Asynchronous mode is not enabled."); - } - - /** - * @param file Configuration file. - * @param restart Whether to stop existing nodes. - * @param timeout Connection timeout. - * @param maxConn Number of parallel SSH connections to one host. - * @return Future with results. - * @see {@link IgniteCluster#startNodes(java.io.File, boolean, int, int)}. - */ - IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file, - boolean restart, - int timeout, - int maxConn) - { - A.notNull(file, "file"); - A.ensure(file.exists(), "file doesn't exist."); - A.ensure(file.isFile(), "file is a directory."); - - try { - IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file); - - return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); - } - } - - /** - * @param hosts Startup parameters. - * @param dflts Default values. - * @param restart Whether to stop existing nodes - * @param timeout Connection timeout in milliseconds. - * @param maxConn Number of parallel SSH connections to one host. - * @return Future with results. - * @see {@link IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}. - */ - IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync( - Collection<Map<String, Object>> hosts, - @Nullable Map<String, Object> dflts, - boolean restart, - int timeout, - int maxConn) - { - A.notNull(hosts, "hosts"); - - guard(); - - try { - IgniteSshProcessor sshProcessor = IgniteComponentType.SSH.create(false); - - Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts); - - Map<String, ConcurrentLinkedQueue<IgniteNodeCallable>> runMap = new HashMap<>(); - - int nodeCallCnt = 0; - - for (String host : specsMap.keySet()) { - InetAddress addr; - - try { - addr = InetAddress.getByName(host); - } - catch (UnknownHostException e) { - throw new IgniteCheckedException("Invalid host name: " + host, e); - } - - Collection<? extends ClusterNode> neighbors = null; - - if (addr.isLoopbackAddress()) - neighbors = neighbors(); - else { - for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) { - ClusterNode node = F.first(p); - - if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) { - neighbors = p; - - break; - } - } - } - - int startIdx = 1; - - if (neighbors != null) { - if (restart && !neighbors.isEmpty()) { - try { - ctx.grid().compute(forNodes(neighbors)).execute(IgniteKillTask.class, false); - } - catch (ClusterGroupEmptyException ignored) { - // No-op, nothing to restart. - } - } - else - startIdx = neighbors.size() + 1; - } - - ConcurrentLinkedQueue<IgniteNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>(); - - runMap.put(host, nodeRuns); - - for (IgniteRemoteStartSpecification spec : specsMap.get(host)) { - assert spec.host().equals(host); - - for (int i = startIdx; i <= spec.nodes(); i++) { - nodeRuns.add(sshProcessor.nodeStartCallable(spec, timeout)); - - nodeCallCnt++; - } - } - } - - // If there is nothing to start, return finished future with empty result. - if (nodeCallCnt == 0) - return new GridFinishedFuture<Collection<GridTuple3<String, Boolean, String>>>( - ctx, Collections.<GridTuple3<String, Boolean, String>>emptyList()); - - // Exceeding max line width for readability. - GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>> - fut = new GridCompoundFuture<>( - ctx, - CU.<GridTuple3<String, Boolean, String>>objectsReducer() - ); - - AtomicInteger cnt = new AtomicInteger(nodeCallCnt); - - // Limit maximum simultaneous connection number per host. - for (ConcurrentLinkedQueue<IgniteNodeCallable> queue : runMap.values()) { - for (int i = 0; i < maxConn; i++) { - if (!runNextNodeCallable(queue, fut, cnt)) - break; - } - } - - return fut; - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); - } - finally { - unguard(); - } - } - - /** - * Gets the all grid nodes that reside on the same physical computer as local grid node. - * Local grid node is excluded. - * <p> - * Detection of the same physical computer is based on comparing set of network interface MACs. - * If two nodes have the same set of MACs, Ignite considers these nodes running on the same - * physical computer. - * @return Grid nodes that reside on the same physical computer as local grid node. - */ - private Collection<ClusterNode> neighbors() { - Collection<ClusterNode> neighbors = new ArrayList<>(1); - - String macs = localNode().attribute(ATTR_MACS); - - assert macs != null; - - for (ClusterNode n : forOthers(localNode()).nodes()) { - if (macs.equals(n.attribute(ATTR_MACS))) - neighbors.add(n); - } - - return neighbors; - } - - /** - * Runs next callable from host node start queue. - * - * @param queue Queue of tasks to poll from. - * @param comp Compound future that comprise all started node tasks. - * @param cnt Atomic counter to check if all futures are added to compound future. - * @return {@code True} if task was started, {@code false} if queue was empty. - */ - private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallable> queue, - final GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>> - comp, - final AtomicInteger cnt) - { - IgniteNodeCallable call = queue.poll(); - - if (call == null) - return false; - - IgniteInternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true); - - comp.add(fut); - - if (cnt.decrementAndGet() == 0) - comp.markInitialized(); - - fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() { - @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) { - runNextNodeCallable(queue, comp, cnt); - } - }); - - return true; - } - - /** - * Clears node local map. - */ - public void clearNodeMap() { - nodeLoc.clear(); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ctx = (GridKernalContext)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(ctx); - } - - /** {@inheritDoc} */ - @Override protected Object readResolve() throws ObjectStreamException { - return ctx.grid().cluster(); - } - - /** {@inheritDoc} */ - @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - // No-op. - } - - /** {@inheritDoc} */ - @Nullable @Override public Object collectDiscoveryData(UUID nodeId) { - return null; - } - - /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID nodeId, Object data) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - // No-op. - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { - return null; - } - - /** {@inheritDoc} */ - public String toString() { - return "IgniteCluster [igniteName=" + ctx.gridName() + ']'; - } -}