http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java new file mode 100644 index 0000000..9c736cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java @@ -0,0 +1,886 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.executor.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.IgniteNodeAttributes.*; + +/** + * + */ +public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Kernal context. */ + protected transient GridKernalContext ctx; + + /** Parent projection. */ + private transient ClusterGroup parent; + + /** Compute. */ + private transient IgniteComputeImpl compute; + + /** Messaging. */ + private transient IgniteMessagingImpl messaging; + + /** Events. */ + private transient IgniteEvents evts; + + /** Services. */ + private transient IgniteServices svcs; + + /** Grid name. */ + private String gridName; + + /** Subject ID. */ + private UUID subjId; + + /** Projection predicate. */ + protected IgnitePredicate<ClusterNode> p; + + /** Node IDs. */ + private Set<UUID> ids; + + /** + * Required by {@link Externalizable}. + */ + public ClusterGroupAdapter() { + // No-op. + } + + /** + * @param subjId Subject ID. + * @param parent Parent of this projection. + * @param ctx Grid kernal context. + * @param p Predicate. + */ + protected ClusterGroupAdapter(@Nullable ClusterGroup parent, + @Nullable GridKernalContext ctx, + @Nullable UUID subjId, + @Nullable IgnitePredicate<ClusterNode> p) + { + this.parent = parent; + + if (ctx != null) + setKernalContext(ctx); + + this.subjId = subjId; + this.p = p; + + ids = null; + } + + /** + * @param parent Parent of this projection. + * @param ctx Grid kernal context. + * @param subjId Subject ID. + * @param ids Node IDs. + */ + protected ClusterGroupAdapter(@Nullable ClusterGroup parent, + @Nullable GridKernalContext ctx, + @Nullable UUID subjId, + Set<UUID> ids) + { + this.parent = parent; + + if (ctx != null) + setKernalContext(ctx); + + assert ids != null; + + this.subjId = subjId; + this.ids = ids; + + p = F.nodeForNodeIds(ids); + } + + /** + * @param subjId Subject ID. + * @param parent Parent of this projection. + * @param ctx Grid kernal context. + * @param p Predicate. + * @param ids Node IDs. + */ + private ClusterGroupAdapter(@Nullable ClusterGroup parent, + @Nullable GridKernalContext ctx, + @Nullable UUID subjId, + @Nullable IgnitePredicate<ClusterNode> p, + Set<UUID> ids) + { + this.parent = parent; + + if (ctx != null) + setKernalContext(ctx); + + this.subjId = subjId; + this.p = p; + this.ids = ids; + + if (p == null && ids != null) + this.p = F.nodeForNodeIds(ids); + } + + /** + * <tt>ctx.gateway().readLock()</tt> + */ + protected void guard() { + assert ctx != null; + + ctx.gateway().readLock(); + } + + /** + * <tt>ctx.gateway().readUnlock()</tt> + */ + protected void unguard() { + assert ctx != null; + + ctx.gateway().readUnlock(); + } + + /** + * Sets kernal context. + * + * @param ctx Kernal context to set. + */ + public void setKernalContext(GridKernalContext ctx) { + assert ctx != null; + assert this.ctx == null; + + this.ctx = ctx; + + if (parent == null) + parent = ctx.grid().cluster(); + + gridName = ctx.gridName(); + } + + /** {@inheritDoc} */ + @Override public final Ignite ignite() { + assert ctx != null; + + guard(); + + try { + return ctx.grid(); + } + finally { + unguard(); + } + } + + /** + * @return {@link org.apache.ignite.IgniteCompute} for this projection. + */ + public final IgniteCompute compute() { + if (compute == null) { + assert ctx != null; + + compute = new IgniteComputeImpl(ctx, this, subjId, false); + } + + return compute; + } + + /** + * @return {@link org.apache.ignite.IgniteMessaging} for this projection. + */ + public final IgniteMessaging message() { + if (messaging == null) { + assert ctx != null; + + messaging = new IgniteMessagingImpl(ctx, this, false); + } + + return messaging; + } + + /** + * @return {@link org.apache.ignite.IgniteEvents} for this projection. + */ + public final IgniteEvents events() { + if (evts == null) { + assert ctx != null; + + evts = new IgniteEventsImpl(ctx, this, false); + } + + return evts; + } + + /** + * @return {@link org.apache.ignite.IgniteServices} for this projection. + */ + public IgniteServices services() { + if (svcs == null) { + assert ctx != null; + + svcs = new IgniteServicesImpl(ctx, this, false); + } + + return svcs; + } + + /** + * @return {@link ExecutorService} for this projection. + */ + public ExecutorService executorService() { + assert ctx != null; + + return new GridExecutorService(this, ctx); + } + + /** {@inheritDoc} */ + @Override public final ClusterMetrics metrics() { + guard(); + + try { + if (nodes().isEmpty()) + throw U.convertException(U.emptyTopologyException()); + + return new ClusterMetricsSnapshot(this); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> nodes() { + guard(); + + try { + if (ids != null) { + if (ids.isEmpty()) + return Collections.emptyList(); + else if (ids.size() == 1) { + ClusterNode node = ctx.discovery().node(F.first(ids)); + + return node != null ? Collections.singleton(node) : Collections.<ClusterNode>emptyList(); + } + else { + Collection<ClusterNode> nodes = new ArrayList<>(ids.size()); + + for (UUID id : ids) { + ClusterNode node = ctx.discovery().node(id); + + if (node != null) + nodes.add(node); + } + + return nodes; + } + } + else { + Collection<ClusterNode> all = ctx.discovery().allNodes(); + + return p != null ? F.view(all, p) : all; + } + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public final ClusterNode node(UUID id) { + A.notNull(id, "id"); + + guard(); + + try { + if (ids != null) + return ids.contains(id) ? ctx.discovery().node(id) : null; + else { + ClusterNode node = ctx.discovery().node(id); + + return node != null && (p == null || p.apply(node)) ? node : null; + } + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode node() { + return F.first(nodes()); + } + + /** {@inheritDoc} */ + @Override public final IgnitePredicate<ClusterNode> predicate() { + return p != null ? p : F.<ClusterNode>alwaysTrue(); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forPredicate(IgnitePredicate<ClusterNode> p) { + A.notNull(p, "p"); + + guard(); + + try { + return new ClusterGroupAdapter(this, ctx, subjId, this.p != null ? F.and(p, this.p) : p); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forAttribute(String name, @Nullable final String val) { + A.notNull(name, "n"); + + return forPredicate(new AttributeFilter(name, val)); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forNode(ClusterNode node, ClusterNode... nodes) { + A.notNull(node, "node"); + + guard(); + + try { + Set<UUID> nodeIds; + + if (F.isEmpty(nodes)) + nodeIds = contains(node) ? Collections.singleton(node.id()) : Collections.<UUID>emptySet(); + else { + nodeIds = U.newHashSet(nodes.length + 1); + + for (ClusterNode n : nodes) + if (contains(n)) + nodeIds.add(n.id()); + + if (contains(node)) + nodeIds.add(node.id()); + } + + return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forNodes(Collection<? extends ClusterNode> nodes) { + A.notEmpty(nodes, "nodes"); + + guard(); + + try { + Set<UUID> nodeIds = U.newHashSet(nodes.size()); + + for (ClusterNode n : nodes) + if (contains(n)) + nodeIds.add(n.id()); + + return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forNodeId(UUID id, UUID... ids) { + A.notNull(id, "id"); + + guard(); + + try { + Set<UUID> nodeIds; + + if (F.isEmpty(ids)) + nodeIds = contains(id) ? Collections.singleton(id) : Collections.<UUID>emptySet(); + else { + nodeIds = U.newHashSet(ids.length + 1); + + for (UUID id0 : ids) { + if (contains(id)) + nodeIds.add(id0); + } + + if (contains(id)) + nodeIds.add(id); + } + + return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forNodeIds(Collection<UUID> ids) { + A.notEmpty(ids, "ids"); + + guard(); + + try { + Set<UUID> nodeIds = U.newHashSet(ids.size()); + + for (UUID id : ids) { + if (contains(id)) + nodeIds.add(id); + } + + return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forOthers(ClusterNode node, ClusterNode... nodes) { + A.notNull(node, "node"); + + return forOthers(F.concat(false, node.id(), F.nodeIds(Arrays.asList(nodes)))); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forOthers(ClusterGroup prj) { + A.notNull(prj, "prj"); + + if (ids != null) { + guard(); + + try { + Set<UUID> nodeIds = U.newHashSet(ids.size()); + + for (UUID id : ids) { + ClusterNode n = node(id); + + if (n != null && !prj.predicate().apply(n)) + nodeIds.add(id); + } + + return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); + } + finally { + unguard(); + } + } + else + return forPredicate(F.not(prj.predicate())); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forRemotes() { + return forOthers(Collections.singleton(ctx.localNodeId())); + } + + /** + * @param excludeIds Node IDs. + * @return New projection. + */ + private ClusterGroup forOthers(Collection<UUID> excludeIds) { + assert excludeIds != null; + + if (ids != null) { + guard(); + + try { + Set<UUID> nodeIds = U.newHashSet(ids.size()); + + for (UUID id : ids) { + if (!excludeIds.contains(id)) + nodeIds.add(id); + } + + return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); + } + finally { + unguard(); + } + } + else + return forPredicate(new OthersFilter(excludeIds)); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forCacheNodes(@Nullable String cacheName) { + return forPredicate(new CachesFilter(cacheName, null)); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forDataNodes(@Nullable String cacheName) { + return forPredicate(new CachesFilter(cacheName, CachesFilter.DATA_MODES)); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forClientNodes(@Nullable String cacheName) { + return forPredicate(new CachesFilter(cacheName, CachesFilter.CLIENT_MODES)); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forStreamer(@Nullable String streamerName, @Nullable String... streamerNames) { + return forPredicate(new StreamersFilter(streamerName, streamerNames)); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forCacheNodes(@Nullable String cacheName, + Set<CacheDistributionMode> distributionModes) { + return forPredicate(new CachesFilter(cacheName, distributionModes)); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forHost(ClusterNode node) { + A.notNull(node, "node"); + + String macs = node.attribute(ATTR_MACS); + + assert macs != null; + + return forAttribute(ATTR_MACS, macs); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forDaemons() { + return forPredicate(new DaemonFilter()); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forRandom() { + return ids != null ? forNodeId(F.rand(ids)) : forNode(F.rand(nodes())); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forOldest() { + return new AgeProjection(this, true); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forYoungest() { + return new AgeProjection(this, false); + } + + /** {@inheritDoc} */ + @Override public ClusterGroupEx forSubjectId(UUID subjId) { + if (subjId == null) + return this; + + guard(); + + try { + return ids != null ? new ClusterGroupAdapter(this, ctx, subjId, ids) : + new ClusterGroupAdapter(this, ctx, subjId, p); + } + finally { + unguard(); + } + } + + /** + * @param n Node. + * @return Whether node belongs to this projection. + */ + private boolean contains(ClusterNode n) { + assert n != null; + + return ids != null ? ids.contains(n.id()) : p == null || p.apply(n); + } + + /** + * @param id Node ID. + * @return Whether node belongs to this projection. + */ + private boolean contains(UUID id) { + assert id != null; + + if (ids != null) + return ids.contains(id); + else { + ClusterNode n = ctx.discovery().node(id); + + return n != null && (p == null || p.apply(n)); + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, gridName); + U.writeUuid(out, subjId); + + out.writeBoolean(ids != null); + + if (ids != null) + out.writeObject(ids); + else + out.writeObject(p); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + gridName = U.readString(in); + subjId = U.readUuid(in); + + if (in.readBoolean()) + ids = (Set<UUID>)in.readObject(); + else + p = (IgnitePredicate<ClusterNode>)in.readObject(); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + protected Object readResolve() throws ObjectStreamException { + try { + IgniteKernal g = IgnitionEx.gridx(gridName); + + ClusterGroup grp = g.cluster(); + + return ids != null ? new ClusterGroupAdapter(grp, g.context(), subjId, ids) : + p != null ? new ClusterGroupAdapter(grp, g.context(), subjId, p) : g; + } + catch (IllegalStateException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + } + + /** + */ + private static class CachesFilter implements IgnitePredicate<ClusterNode> { + /** */ + private static final Set<CacheDistributionMode> DATA_MODES = EnumSet.of(CacheDistributionMode.NEAR_PARTITIONED, + CacheDistributionMode.PARTITIONED_ONLY); + + /** */ + private static final Set<CacheDistributionMode> CLIENT_MODES = EnumSet.of(CacheDistributionMode.CLIENT_ONLY, + CacheDistributionMode.NEAR_ONLY); + + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private final String cacheName; + + /** */ + private final Set<CacheDistributionMode> distributionMode; + + /** + * @param cacheName Cache name. + * @param distributionMode Filter by {@link org.apache.ignite.configuration.CacheConfiguration#getDistributionMode()}. + */ + private CachesFilter(@Nullable String cacheName, @Nullable Set<CacheDistributionMode> distributionMode) { + this.cacheName = cacheName; + this.distributionMode = distributionMode; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode n) { + GridCacheAttributes[] caches = n.attribute(ATTR_CACHE); + + if (caches != null) { + for (GridCacheAttributes attrs : caches) { + if (Objects.equals(cacheName, attrs.cacheName()) + && (distributionMode == null || distributionMode.contains(attrs.partitionedTaxonomy()))) + return true; + } + } + + return false; + } + } + + /** + */ + private static class StreamersFilter implements IgnitePredicate<ClusterNode> { + /** */ + private static final long serialVersionUID = 0L; + + /** Streamer name. */ + private final String streamerName; + + /** Streamer names. */ + private final String[] streamerNames; + + /** + * @param streamerName Streamer name. + * @param streamerNames Streamer names. + */ + private StreamersFilter(@Nullable String streamerName, @Nullable String[] streamerNames) { + this.streamerName = streamerName; + this.streamerNames = streamerNames; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode n) { + if (!U.hasStreamer(n, streamerName)) + return false; + + if (!F.isEmpty(streamerNames)) + for (String sn : streamerNames) + if (!U.hasStreamer(n, sn)) + return false; + + return true; + } + } + + /** + */ + private static class AttributeFilter implements IgnitePredicate<ClusterNode> { + /** */ + private static final long serialVersionUID = 0L; + + /** Name. */ + private final String name; + + /** Value. */ + private final String val; + + /** + * @param name Name. + * @param val Value. + */ + private AttributeFilter(String name, String val) { + this.name = name; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode n) { + return val == null ? n.attributes().containsKey(name) : val.equals(n.attribute(name)); + } + } + + /** + */ + private static class DaemonFilter implements IgnitePredicate<ClusterNode> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode n) { + return n.isDaemon(); + } + } + + /** + */ + private static class OthersFilter implements IgnitePredicate<ClusterNode> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Collection<UUID> nodeIds; + + /** + * @param nodeIds Node IDs. + */ + private OthersFilter(Collection<UUID> nodeIds) { + this.nodeIds = nodeIds; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode n) { + return !nodeIds.contains(n.id()); + } + } + + /** + * Age-based projection. + */ + private static class AgeProjection extends ClusterGroupAdapter { + /** Serialization version. */ + private static final long serialVersionUID = 0L; + + /** Oldest flag. */ + private boolean isOldest; + + /** Selected node. */ + private volatile ClusterNode node; + + /** Last topology version. */ + private volatile long lastTopVer; + + /** + * Required for {@link Externalizable}. + */ + public AgeProjection() { + // No-op. + } + + /** + * @param prj Parent projection. + * @param isOldest Oldest flag. + */ + private AgeProjection(ClusterGroupAdapter prj, boolean isOldest) { + super(prj.parent, prj.ctx, prj.subjId, prj.p, prj.ids); + + this.isOldest = isOldest; + + reset(); + } + + /** + * Resets node. + */ + private synchronized void reset() { + guard(); + + try { + lastTopVer = ctx.discovery().topologyVersion(); + + this.node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode node() { + if (ctx.discovery().topologyVersion() != lastTopVer) + reset(); + + return node; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> nodes() { + if (ctx.discovery().topologyVersion() != lastTopVer) + reset(); + + ClusterNode node = this.node; + + return node == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(node); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java new file mode 100644 index 0000000..f11b781 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Internal projection interface. + */ +public interface ClusterGroupEx extends ClusterGroup { + /** + * Creates projection for specified subject ID. + * + * @param subjId Subject ID. + * @return Internal projection. + */ + public ClusterGroupEx forSubjectId(UUID subjId); + + /** + * @param cacheName Cache name. + * @param distributionModes Cache distribution modes. + * @return Cluster group. + */ + public ClusterGroup forCacheNodes(@Nullable String cacheName, Set<CacheDistributionMode> distributionModes); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterNodeLocalMapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterNodeLocalMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterNodeLocalMapImpl.java new file mode 100644 index 0000000..948cae9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterNodeLocalMapImpl.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.concurrent.*; + +/** + * + */ +public class ClusterNodeLocalMapImpl<K, V> extends ConcurrentHashMap8<K, V> implements ClusterNodeLocalMap<K, V>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final ThreadLocal<String> stash = new ThreadLocal<>(); + + /** */ + private GridKernalContext ctx; + + /** + * No-arg constructor is required by externalization. + */ + public ClusterNodeLocalMapImpl() { + // No-op. + } + + /** + * + * @param ctx Kernal context. + */ + ClusterNodeLocalMapImpl(GridKernalContext ctx) { + assert ctx != null; + + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Nullable @Override public V addIfAbsent(K key, @Nullable Callable<V> dflt) { + return F.addIfAbsent(this, key, dflt); + } + + /** {@inheritDoc} */ + @Override public V addIfAbsent(K key, V val) { + return F.addIfAbsent(this, key, val); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, ctx.gridName()); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + stash.set(U.readString(in)); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + protected Object readResolve() throws ObjectStreamException { + try { + return IgnitionEx.gridx(stash.get()).cluster().nodeLocalMap(); + } + catch (IllegalStateException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClusterNodeLocalMapImpl.class, this); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java new file mode 100644 index 0000000..2095e70 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * + */ +public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> implements IgniteCluster { + /** */ + private final IgniteClusterImpl cluster; + + /** + * @param cluster Cluster. + */ + public IgniteClusterAsyncImpl(IgniteClusterImpl cluster) { + super(true); + + this.cluster = cluster; + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return cluster.localNode(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forLocal() { + return cluster.forLocal(); + } + + /** {@inheritDoc} */ + @Override public <K, V> ClusterNodeLocalMap<K, V> nodeLocalMap() { + return cluster.nodeLocalMap(); + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + return cluster.pingNode(nodeId); + } + + /** {@inheritDoc} */ + @Override public long topologyVersion() { + return cluster.topologyVersion(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Collection<ClusterNode> topology(long topVer) { + return cluster.topology(topVer); + } + + /** {@inheritDoc} */ + @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName, + @Nullable Collection<? extends K> keys) { + return cluster.mapKeysToNodes(cacheName, keys); + } + + /** {@inheritDoc} */ + @Nullable @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) { + return cluster.mapKeyToNode(cacheName, key); + } + + /** {@inheritDoc} */ + @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file, + boolean restart, + int timeout, + int maxConn) + { + try { + return saveOrGet(cluster.startNodesAsync(file, restart, timeout, maxConn)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public Collection<GridTuple3<String, Boolean, String>> startNodes( + Collection<Map<String, Object>> hosts, + @Nullable Map<String, Object> dflts, + boolean restart, + int timeout, + int maxConn) + { + try { + return saveOrGet(cluster.startNodesAsync(hosts, dflts, restart, timeout, maxConn)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void stopNodes() { + cluster.stopNodes(); + } + + /** {@inheritDoc} */ + @Override public void stopNodes(Collection<UUID> ids) { + cluster.stopNodes(ids); + } + + /** {@inheritDoc} */ + @Override public void restartNodes() { + cluster.restartNodes(); + } + + /** {@inheritDoc} */ + @Override public void restartNodes(Collection<UUID> ids) { + cluster.restartNodes(ids); + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + cluster.resetMetrics(); + } + + /** {@inheritDoc} */ + @Override public Ignite ignite() { + return cluster.ignite(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forNodes(Collection<? extends ClusterNode> nodes) { + return cluster.forNodes(nodes); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forNode(ClusterNode node, ClusterNode... nodes) { + return cluster.forNode(node, nodes); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forOthers(ClusterNode node, ClusterNode... nodes) { + return cluster.forOthers(node, nodes); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forOthers(ClusterGroup prj) { + return cluster.forOthers(prj); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forNodeIds(Collection<UUID> ids) { + return cluster.forNodeIds(ids); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forNodeId(UUID id, UUID... ids) { + return cluster.forNodeId(id, ids); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forPredicate(IgnitePredicate<ClusterNode> p) { + return cluster.forPredicate(p); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forAttribute(String name, @Nullable String val) { + return cluster.forAttribute(name, val); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forCacheNodes(String cacheName) { + return cluster.forCacheNodes(cacheName); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forDataNodes(String cacheName) { + return cluster.forDataNodes(cacheName); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forClientNodes(String cacheName) { + return cluster.forClientNodes(cacheName); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) { + return cluster.forStreamer(streamerName, streamerNames); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forRemotes() { + return cluster.forRemotes(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forHost(ClusterNode node) { + return cluster.forHost(node); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forDaemons() { + return cluster.forDaemons(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forRandom() { + return cluster.forRandom(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forOldest() { + return cluster.forOldest(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forYoungest() { + return cluster.forYoungest(); + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> nodes() { + return cluster.nodes(); + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode node(UUID id) { + return cluster.node(id); + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode node() { + return cluster.node(); + } + + /** {@inheritDoc} */ + @Override public IgnitePredicate<ClusterNode> predicate() { + return cluster.predicate(); + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics metrics() { + return cluster.metrics(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterEx.java new file mode 100644 index 0000000..4c06031 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterEx.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster; + +import org.apache.ignite.*; + +/** + * + */ +public interface IgniteClusterEx extends IgniteCluster, ClusterGroupEx { + // No-op. +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java new file mode 100644 index 0000000..95e5b5c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.nodestart.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.IgniteNodeAttributes.*; +import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*; + +/** + * + */ +public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClusterEx, Externalizable { + /** */ + private IgniteConfiguration cfg; + + /** Node local store. */ + @GridToStringExclude + private ClusterNodeLocalMap nodeLoc; + + /** {@inheritDoc} */ + @Override public void setKernalContext(GridKernalContext ctx) { + super.setKernalContext(ctx); + + cfg = ctx.config(); + + nodeLoc = new ClusterNodeLocalMapImpl(ctx); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forLocal() { + guard(); + + try { + return new ClusterGroupAdapter(this, ctx, null, Collections.singleton(cfg.getNodeId())); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + guard(); + + try { + ClusterNode node = ctx.discovery().localNode(); + + assert node != null; + + return node; + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <K, V> ClusterNodeLocalMap<K, V> nodeLocalMap() { + guard(); + + try { + return nodeLoc; + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + A.notNull(nodeId, "nodeId"); + + guard(); + + try { + return ctx.discovery().pingNode(nodeId); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public long topologyVersion() { + guard(); + + try { + return ctx.discovery().topologyVersion(); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> topology(long topVer) throws UnsupportedOperationException { + guard(); + + try { + return ctx.discovery().topology(topVer); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName, + @Nullable Collection<? extends K> keys) + throws IgniteException + { + if (F.isEmpty(keys)) + return Collections.emptyMap(); + + guard(); + + try { + return ctx.affinity().mapKeysToNodes(cacheName, keys); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteException { + A.notNull(key, "key"); + + guard(); + + try { + return ctx.affinity().mapKeyToNode(cacheName, key); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file, + boolean restart, + int timeout, + int maxConn) + throws IgniteException + { + try { + return startNodesAsync(file, restart, timeout, maxConn).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(Collection<Map<String, Object>> hosts, + @Nullable Map<String, Object> dflts, + boolean restart, + int timeout, + int maxConn) + throws IgniteException + { + try { + return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void stopNodes() throws IgniteException { + guard(); + + try { + compute().execute(IgniteKillTask.class, false); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void stopNodes(Collection<UUID> ids) throws IgniteException { + guard(); + + try { + ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, false); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void restartNodes() throws IgniteException { + guard(); + + try { + compute().execute(IgniteKillTask.class, true); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void restartNodes(Collection<UUID> ids) throws IgniteException { + guard(); + + try { + ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, true); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + guard(); + + try { + ctx.jobMetric().reset(); + ctx.io().resetMetrics(); + ctx.task().resetMetrics(); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public IgniteCluster withAsync() { + return new IgniteClusterAsyncImpl(this); + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return false; + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + throw new IllegalStateException("Asynchronous mode is not enabled."); + } + + /** + * @param file Configuration file. + * @param restart Whether to stop existing nodes. + * @param timeout Connection timeout. + * @param maxConn Number of parallel SSH connections to one host. + * @return Future with results. + * @see {@link IgniteCluster#startNodes(java.io.File, boolean, int, int)}. + */ + IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file, + boolean restart, + int timeout, + int maxConn) + { + A.notNull(file, "file"); + A.ensure(file.exists(), "file doesn't exist."); + A.ensure(file.isFile(), "file is a directory."); + + try { + IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file); + + return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx, e); + } + } + + /** + * @param hosts Startup parameters. + * @param dflts Default values. + * @param restart Whether to stop existing nodes + * @param timeout Connection timeout in milliseconds. + * @param maxConn Number of parallel SSH connections to one host. + * @return Future with results. + * @see {@link IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}. + */ + IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync( + Collection<Map<String, Object>> hosts, + @Nullable Map<String, Object> dflts, + boolean restart, + int timeout, + int maxConn) + { + A.notNull(hosts, "hosts"); + + guard(); + + try { + IgniteSshProcessor sshProcessor = IgniteComponentType.SSH.create(false); + + Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts); + + Map<String, ConcurrentLinkedQueue<IgniteNodeCallable>> runMap = new HashMap<>(); + + int nodeCallCnt = 0; + + for (String host : specsMap.keySet()) { + InetAddress addr; + + try { + addr = InetAddress.getByName(host); + } + catch (UnknownHostException e) { + throw new IgniteCheckedException("Invalid host name: " + host, e); + } + + Collection<? extends ClusterNode> neighbors = null; + + if (addr.isLoopbackAddress()) + neighbors = neighbors(); + else { + for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) { + ClusterNode node = F.first(p); + + if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) { + neighbors = p; + + break; + } + } + } + + int startIdx = 1; + + if (neighbors != null) { + if (restart && !neighbors.isEmpty()) { + try { + ctx.grid().compute(forNodes(neighbors)).execute(IgniteKillTask.class, false); + } + catch (ClusterGroupEmptyException ignored) { + // No-op, nothing to restart. + } + } + else + startIdx = neighbors.size() + 1; + } + + ConcurrentLinkedQueue<IgniteNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>(); + + runMap.put(host, nodeRuns); + + for (IgniteRemoteStartSpecification spec : specsMap.get(host)) { + assert spec.host().equals(host); + + for (int i = startIdx; i <= spec.nodes(); i++) { + nodeRuns.add(sshProcessor.nodeStartCallable(spec, timeout)); + + nodeCallCnt++; + } + } + } + + // If there is nothing to start, return finished future with empty result. + if (nodeCallCnt == 0) + return new GridFinishedFuture<Collection<GridTuple3<String, Boolean, String>>>( + ctx, Collections.<GridTuple3<String, Boolean, String>>emptyList()); + + // Exceeding max line width for readability. + GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>> + fut = new GridCompoundFuture<>( + ctx, + CU.<GridTuple3<String, Boolean, String>>objectsReducer() + ); + + AtomicInteger cnt = new AtomicInteger(nodeCallCnt); + + // Limit maximum simultaneous connection number per host. + for (ConcurrentLinkedQueue<IgniteNodeCallable> queue : runMap.values()) { + for (int i = 0; i < maxConn; i++) { + if (!runNextNodeCallable(queue, fut, cnt)) + break; + } + } + + return fut; + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx, e); + } + finally { + unguard(); + } + } + + /** + * Gets the all grid nodes that reside on the same physical computer as local grid node. + * Local grid node is excluded. + * <p> + * Detection of the same physical computer is based on comparing set of network interface MACs. + * If two nodes have the same set of MACs, Ignite considers these nodes running on the same + * physical computer. + * @return Grid nodes that reside on the same physical computer as local grid node. + */ + private Collection<ClusterNode> neighbors() { + Collection<ClusterNode> neighbors = new ArrayList<>(1); + + String macs = localNode().attribute(ATTR_MACS); + + assert macs != null; + + for (ClusterNode n : forOthers(localNode()).nodes()) { + if (macs.equals(n.attribute(ATTR_MACS))) + neighbors.add(n); + } + + return neighbors; + } + + /** + * Runs next callable from host node start queue. + * + * @param queue Queue of tasks to poll from. + * @param comp Compound future that comprise all started node tasks. + * @param cnt Atomic counter to check if all futures are added to compound future. + * @return {@code True} if task was started, {@code false} if queue was empty. + */ + private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallable> queue, + final GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>> + comp, + final AtomicInteger cnt) + { + IgniteNodeCallable call = queue.poll(); + + if (call == null) + return false; + + IgniteInternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true); + + comp.add(fut); + + if (cnt.decrementAndGet() == 0) + comp.markInitialized(); + + fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() { + @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) { + runNextNodeCallable(queue, comp, cnt); + } + }); + + return true; + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ctx = (GridKernalContext)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx); + } + + /** {@inheritDoc} */ + @Override protected Object readResolve() throws ObjectStreamException { + return ctx.grid().cluster(); + } + + /** {@inheritDoc} */ + public String toString() { + return "IgniteCluster [igniteName=" + ctx.gridName() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteKillTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteKillTask.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteKillTask.java new file mode 100644 index 0000000..bd81bc3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteKillTask.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +import static org.apache.ignite.internal.IgniteNodeAttributes.*; + +/** + * Special kill task that never fails over jobs. + */ +@GridInternal +class IgniteKillTask extends ComputeTaskAdapter<Boolean, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** Restart flag. */ + private boolean restart; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Boolean restart) { + assert restart != null; + + this.restart = restart; + + Map<ComputeJob, ClusterNode> jobs = U.newHashMap(subgrid.size()); + + for (ClusterNode n : subgrid) + if (!daemon(n)) + jobs.put(new IgniteKillJob(), n); + + return jobs; + } + + /** + * Checks if given node is a daemon node. + * + * @param n Node. + * @return Whether node is daemon. + */ + private boolean daemon(ClusterNode n) { + return "true".equalsIgnoreCase(n.<String>attribute(ATTR_DAEMON)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Override public Void reduce(List<ComputeJobResult> results) { + return null; + } + + /** + * Kill job. + */ + private class IgniteKillJob extends ComputeJobAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public Object execute() { + if (restart) + new Thread(new Runnable() { + @Override public void run() { + G.restart(true); + } + }, + "ignite-restarter").start(); + else + new Thread(new Runnable() { + @Override public void run() { + G.kill(true); + } + }, + "ignite-stopper").start(); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java index 96793af..1d0ef09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.executor; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 808c62f..18dad2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -194,7 +194,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public ClusterGroup gridProjection() { - return ctx.grid().forCacheNodes(name()); + return ctx.grid().cluster().forCacheNodes(name()); } /** @@ -1541,7 +1541,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, @Override public void clear(long timeout) throws IgniteCheckedException { try { // Send job to remote nodes only. - Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).forRemotes().nodes(); + Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes(); IgniteInternalFuture<Object> fut = null; @@ -1571,7 +1571,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync() { - Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes(); + Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).nodes(); if (!nodes.isEmpty()) { IgniteInternalFuture<Object> fut = @@ -4012,7 +4012,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (replaceExisting) { if (ctx.store().isLocalStore()) { - Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes(); + Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (nodes.isEmpty()) return new GridFinishedFuture<>(ctx.kernalContext()); @@ -4033,7 +4033,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } } else { - Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes(); + Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (nodes.isEmpty()) return new GridFinishedFuture<>(ctx.kernalContext()); @@ -4182,7 +4182,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, PeekModes modes = parsePeekModes(peekModes); - ClusterGroup grp = modes.near ? ctx.grid().forCacheNodes(name(), SIZE_NODES) : ctx.grid().forDataNodes(name()); + IgniteClusterEx cluster = ctx.grid().cluster(); + + ClusterGroup grp = modes.near ? cluster.forCacheNodes(name(), SIZE_NODES) : cluster.forDataNodes(name()); Collection<ClusterNode> nodes = grp.nodes(); @@ -4511,7 +4513,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, private int globalSize(boolean primaryOnly) throws IgniteCheckedException { try { // Send job to remote nodes only. - Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).forRemotes().nodes(); + Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes(); IgniteInternalFuture<Collection<Integer>> fut = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 9489e36..19bc444 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -346,7 +346,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V qry.getBufferSize(), qry.getTimeInterval(), qry.isAutoUnsubscribe(), - loc ? ctx.grid().forLocal() : null); + loc ? ctx.grid().cluster().forLocal() : null); final QueryCursor<Cache.Entry<K, V>> cur; @@ -383,11 +383,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * @param local Enforce local. + * @param loc Enforce local. * @return Local node cluster group. */ - private ClusterGroup projection(boolean local) { - return local ? ctx.kernalContext().grid().forLocal() : null; + private ClusterGroup projection(boolean loc) { + return loc ? ctx.kernalContext().grid().cluster().forLocal() : null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 8c73cee..dc82e83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -143,7 +143,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter topVer = ctx.affinity().affinityTopologyVersion(); // Send job to all data nodes. - Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes(); + Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { ctx.closures().callAsyncNoFailover(BROADCAST, @@ -174,7 +174,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param topVer Topology version. */ private void removeAllAsync(final GridFutureAdapter<Void> opFut, final long topVer) { - Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes(); + Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 67c1208..362077f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1299,7 +1299,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (log.isDebugEnabled()) log.debug("Record [key=" + key + ", val=" + val + ", incBackups=" + incBackups + "priNode=" + U.id8(CU.primaryNode(cctx, key).id()) + - ", node=" + U.id8(cctx.grid().localNode().id()) + ']'); + ", node=" + U.id8(cctx.localNode().id()) + ']'); if (val == null) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 8480211..0643c0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -278,7 +278,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K true, false, true, - loc ? cctx.grid().forLocal() : null); + loc ? cctx.grid().cluster().forLocal() : null); } public void cancelInternalQuery(UUID routineId) { @@ -347,7 +347,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K cctx.checkSecurity(GridSecurityPermission.CACHE_READ); if (grp == null) - grp = cctx.kernalContext().grid(); + grp = cctx.kernalContext().grid().cluster(); Collection<ClusterNode> nodes = grp.nodes(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 10ccfd0..ced8d1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -178,7 +178,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay log = U.logger(ctx, logRef, IgniteDataLoaderImpl.class); - ClusterNode node = F.first(ctx.grid().forCacheNodes(cacheName).nodes()); + ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); if (node == null) throw new IllegalStateException("Cache doesn't exist: " + cacheName); @@ -294,7 +294,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay if (allow == allowOverwrite()) return; - ClusterNode node = F.first(ctx.grid().forCacheNodes(cacheName).nodes()); + ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); if (node == null) throw new IgniteException("Failed to get node for cache: " + cacheName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 7e68ca4..5d96086 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -118,7 +118,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); - qry.projection(ctx.grid().forNodes(nodes)); + qry.projection(ctx.grid().cluster().forNodes(nodes)); Iterable<Integer> col = (Iterable<Integer>)qry.execute(new SumReducer()).get(); @@ -349,7 +349,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); - qry.projection(ctx.grid().forNodes(nodes)); + qry.projection(ctx.grid().cluster().forNodes(nodes)); CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 64ab391..983dd55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -347,7 +347,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { chain(resultWrapper((CacheProjection<Object, Object>)prj, key)); } else { - ClusterGroup prj = ctx.grid().forPredicate(F.nodeForNodeId(destId)); + ClusterGroup prj = ctx.grid().cluster().forPredicate(F.nodeForNodeId(destId)); ctx.task().setThreadContext(TC_NO_FAILOVER, true); @@ -385,7 +385,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { return op.apply(cache, ctx).chain(resultWrapper(cache, key)); } else { - ClusterGroup prj = ctx.grid().forPredicate(F.nodeForNodeId(destId)); + ClusterGroup prj = ctx.grid().cluster().forPredicate(F.nodeForNodeId(destId)); ctx.task().setThreadContext(TC_NO_FAILOVER, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java index bcf6935..93e3363 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java @@ -130,7 +130,7 @@ public class GridCacheQueryCommandHandler extends GridRestCommandHandlerAdapter ctx.task().setThreadContext(TC_NO_FAILOVER, true); - return ctx.closure().callAsync(BALANCE, c, ctx.grid().forNodeId(destId).nodes()); + return ctx.closure().callAsync(BALANCE, c, ctx.grid().cluster().forNodeId(destId).nodes()); } } @@ -144,7 +144,7 @@ public class GridCacheQueryCommandHandler extends GridRestCommandHandlerAdapter IgniteInternalFuture<Collection<Object>> fut = ctx.closure().callAsync(BROADCAST, Arrays.asList(c), - ctx.grid().forCacheNodes(cacheName).nodes()); + ctx.grid().cluster().forCacheNodes(cacheName).nodes()); return fut.chain(new C1<IgniteInternalFuture<Collection<Object>>, GridRestResponse>() { @Override public GridRestResponse apply(IgniteInternalFuture<Collection<Object>> fut) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java index f9004c3..78b6bd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java @@ -199,7 +199,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { else { // Using predicate instead of node intentionally // in order to provide user well-structured EmptyProjectionException. - ClusterGroup prj = ctx.grid().forPredicate(F.nodeForNodeId(req.destinationId())); + ClusterGroup prj = ctx.grid().cluster().forPredicate(F.nodeForNodeId(req.destinationId())); ctx.task().setThreadContext(TC_NO_FAILOVER, true); @@ -612,7 +612,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public Object call() throws Exception { - return g.compute(g.forSubjectId(clientId)).execute( + return g.compute(g.cluster().forSubjectId(clientId)).execute( name, !params.isEmpty() ? params.size() == 1 ? params.get(0) : params.toArray() : null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java index a986d5e..e32f6f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java @@ -104,7 +104,7 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter { // Always refresh topology so client see most up-to-date view. ctx.discovery().alive(id); - node = ctx.grid().node(id); + node = ctx.grid().cluster().node(id); if (ip != null && node != null && !containsIp(node.addresses(), ip)) node = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java index 7d112eb..c2a992d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java @@ -189,7 +189,7 @@ public class GridStreamerContextImpl implements StreamerContext { ClusterGroup prj = streamPrj.get(); if (prj == null) { - prj = ctx.grid().forStreamer(streamer.name()); + prj = ctx.grid().cluster().forStreamer(streamer.name()); streamPrj.compareAndSet(null, prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java index a49d568..f425d75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java @@ -82,10 +82,11 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple< /** * @param subJob Sub job to execute asynchronously. + * @param idx Index. * @return {@code true} If subJob was not completed and this job should be suspended. */ private boolean callAsync(IgniteCallable<Integer> subJob, int idx) { - IgniteCompute compute = ignite.compute(ignite.forCacheNodes(cacheName)).withAsync(); + IgniteCompute compute = ignite.compute(ignite.cluster().forCacheNodes(cacheName)).withAsync(); compute.call(subJob); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java index 02fd4f9..bf828e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeCancelSessionsTask.java @@ -66,7 +66,7 @@ public class VisorComputeCancelSessionsTask extends VisorMultiNodeTask<Map<UUID, Set<IgniteUuid> sesIds = arg.get(ignite.localNode().id()); if (sesIds != null && !sesIds.isEmpty()) { - IgniteCompute compute = ignite.compute(ignite.forLocal()); + IgniteCompute compute = ignite.compute(ignite.cluster().forLocal()); Map<IgniteUuid, ComputeTaskFuture<Object>> futs = compute.activeTaskFutures(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java index c495964..6b6cc61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeResetMetricsTask.java @@ -51,7 +51,7 @@ public class VisorComputeResetMetricsTask extends VisorOneNodeTask<Void, Void> { /** {@inheritDoc} */ @Override protected Void run(Void arg) { - ignite.resetMetrics(); + ignite.cluster().resetMetrics(); return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java index 12f42d3..b857eee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorComputeToggleMonitoringTask.java @@ -75,7 +75,7 @@ public class VisorComputeToggleMonitoringTask extends if (checkExplicitTaskMonitoring(ignite)) return true; else { - ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.nodeLocalMap(); + ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.cluster().nodeLocalMap(); VisorComputeMonitoringHolder holder = storage.get(COMPUTE_MONITORING_HOLDER_KEY); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index c32a23b..e576704 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -84,7 +84,7 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa res.taskMonitoringEnabled(arg.taskMonitoringEnabled()); if (arg.taskMonitoringEnabled()) { - ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.nodeLocalMap(); + ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.cluster().nodeLocalMap(); VisorComputeMonitoringHolder holder = storage.get(COMPUTE_MONITORING_HOLDER_KEY); @@ -221,7 +221,7 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa VisorNodeDataCollectorTaskArg arg) { res.gridName(ignite.name()); - res.topologyVersion(ignite.topologyVersion()); + res.topologyVersion(ignite.cluster().topologyVersion()); long start0 = U.currentTimeMillis();