# ignite-301
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a67b72ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a67b72ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a67b72ca Branch: refs/heads/ignite-301 Commit: a67b72ca306c4b9211646fe3bc6bb7482b7c689e Parents: 7f923e0 Author: sboikov <sboi...@gridgain.com> Authored: Thu Feb 19 12:35:59 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Feb 19 12:36:02 2015 +0300 ---------------------------------------------------------------------- .../ClientAbstractMultiNodeSelfTest.java | 6 +- .../rest/AbstractRestProcessorSelfTest.java | 2 +- .../ignite/internal/ClusterGroupAdapter.java | 889 ------------------- .../apache/ignite/internal/ClusterGroupEx.java | 44 - .../internal/ClusterNodeLocalMapImpl.java | 103 --- .../apache/ignite/internal/GridKillTask.java | 103 --- .../ignite/internal/IgniteClusterAsyncImpl.java | 261 ------ .../ignite/internal/IgniteComputeImpl.java | 1 + .../ignite/internal/IgniteEventsImpl.java | 1 + .../org/apache/ignite/internal/IgniteEx.java | 14 +- .../apache/ignite/internal/IgniteKernal.java | 590 +++--------- .../ignite/internal/IgniteMessagingImpl.java | 1 + .../ignite/internal/IgniteServicesImpl.java | 1 + .../internal/cluster/ClusterGroupAdapter.java | 886 ++++++++++++++++++ .../ignite/internal/cluster/ClusterGroupEx.java | 44 + .../cluster/ClusterNodeLocalMapImpl.java | 104 +++ .../cluster/IgniteClusterAsyncImpl.java | 262 ++++++ .../internal/cluster/IgniteClusterEx.java | 27 + .../internal/cluster/IgniteClusterImpl.java | 508 +++++++++++ .../ignite/internal/cluster/IgniteKillTask.java | 103 +++ .../internal/executor/GridExecutorService.java | 1 + .../processors/cache/GridCacheAdapter.java | 16 +- .../processors/cache/IgniteCacheProxy.java | 8 +- .../GridDistributedCacheAdapter.java | 4 +- .../cache/query/GridCacheQueryManager.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 4 +- .../dataload/IgniteDataLoaderImpl.java | 4 +- .../datastructures/GridCacheSetImpl.java | 4 +- .../handlers/cache/GridCacheCommandHandler.java | 4 +- .../cache/GridCacheQueryCommandHandler.java | 4 +- .../handlers/task/GridTaskCommandHandler.java | 4 +- .../top/GridTopologyCommandHandler.java | 2 +- .../streamer/GridStreamerContextImpl.java | 2 +- .../visor/cache/VisorCacheClearTask.java | 3 +- .../compute/VisorComputeCancelSessionsTask.java | 2 +- .../compute/VisorComputeResetMetricsTask.java | 2 +- .../VisorComputeToggleMonitoringTask.java | 2 +- .../visor/node/VisorNodeDataCollectorJob.java | 4 +- .../node/VisorNodeEventsCollectorTask.java | 2 +- .../internal/visor/node/VisorNodePingTask.java | 2 +- .../visor/query/VisorQueryCleanupTask.java | 3 +- .../visor/query/VisorQueryNextPageTask.java | 6 +- .../internal/visor/query/VisorQueryTask.java | 10 +- .../ignite/IgniteCacheAffinitySelfTest.java | 8 +- .../ignite/internal/ClusterMetricsSelfTest.java | 2 +- .../GridJobMasterLeaveAwareSelfTest.java | 8 +- .../internal/GridMultipleJobsSelfTest.java | 2 +- .../ignite/internal/GridProjectionSelfTest.java | 2 +- .../apache/ignite/internal/GridSelfTest.java | 13 +- .../GridAffinityProcessorAbstractSelfTest.java | 7 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 28 +- .../cache/GridCacheAffinityApiSelfTest.java | 18 +- .../cache/GridCacheEntryMemorySizeSelfTest.java | 4 +- ...GridCacheMixedPartitionExchangeSelfTest.java | 2 +- ...ridCacheQueueJoinedNodeSelfAbstractTest.java | 4 +- ...GridCacheQueueMultiNodeAbstractSelfTest.java | 4 +- ...dCacheSequenceMultiNodeAbstractSelfTest.java | 4 +- ...dCachePartitionedQueueEntryMoveSelfTest.java | 2 +- .../dht/GridCacheDhtEntrySelfTest.java | 2 +- ...GridCacheDhtEvictionNearReadersSelfTest.java | 4 +- .../dht/GridCacheDhtEvictionSelfTest.java | 2 +- .../dht/GridCacheDhtInternalEntrySelfTest.java | 2 +- .../near/GridCacheNearMultiNodeSelfTest.java | 4 +- .../near/GridCacheNearReadersSelfTest.java | 18 +- ...ePartitionedBasicStoreMultiNodeSelfTest.java | 12 +- ...titionedExplicitLockNodeFailureSelfTest.java | 4 +- ...achePartitionedMultiNodeFullApiSelfTest.java | 2 +- ...hePartitionedQueryMultiThreadedSelfTest.java | 2 +- ...CacheReplicatedPreloadLifecycleSelfTest.java | 2 +- ...ridCacheContinuousQueryAbstractSelfTest.java | 14 +- .../closure/GridClosureProcessorRemoteTest.java | 2 +- .../closure/GridClosureProcessorSelfTest.java | 14 +- .../continuous/GridEventConsumeSelfTest.java | 18 +- .../continuous/GridMessageListenSelfTest.java | 14 +- .../processors/igfs/IgfsSizeSelfTest.java | 4 +- .../processors/igfs/IgfsStreamsSelfTest.java | 2 +- .../GridServiceReassignmentSelfTest.java | 2 +- .../marshaller/GridMarshallerAbstractTest.java | 14 +- .../spi/GridTcpSpiForwardingSelfTest.java | 4 +- .../tcp/TcpDiscoverySpiWildcardSelfTest.java | 2 +- .../testframework/junits/GridAbstractTest.java | 4 +- .../IgfsHadoop20FileSystemAbstractSelfTest.java | 2 +- ...idHadoopDefaultMapReducePlannerSelfTest.java | 227 +---- .../GridCacheAbstractFieldsQuerySelfTest.java | 7 +- .../cache/GridCacheQueryMetricsSelfTest.java | 4 +- .../near/GridCachePartitionedQuerySelfTest.java | 4 +- .../GridCacheReplicatedFieldsQuerySelfTest.java | 2 +- .../tcp/GridOrderedMessageCancelSelfTest.java | 4 +- .../resource/GridServiceInjectionSelfTest.java | 4 +- .../visor/commands/ack/VisorAckCommand.scala | 4 +- .../commands/alert/VisorAlertCommand.scala | 11 +- .../commands/cache/VisorCacheClearCommand.scala | 4 +- .../commands/cache/VisorCacheCommand.scala | 6 +- .../cache/VisorCacheCompactCommand.scala | 4 +- .../commands/cache/VisorCacheScanCommand.scala | 4 +- .../commands/cache/VisorCacheSwapCommand.scala | 4 +- .../config/VisorConfigurationCommand.scala | 4 +- .../commands/disco/VisorDiscoveryCommand.scala | 6 +- .../commands/events/VisorEventsCommand.scala | 4 +- .../visor/commands/gc/VisorGcCommand.scala | 4 +- .../visor/commands/kill/VisorKillCommand.scala | 12 +- .../visor/commands/node/VisorNodeCommand.scala | 2 +- .../visor/commands/ping/VisorPingCommand.scala | 4 +- .../commands/start/VisorStartCommand.scala | 4 +- .../commands/tasks/VisorTasksCommand.scala | 12 +- .../commands/top/VisorTopologyCommand.scala | 4 +- .../visor/commands/vvm/VisorVvmCommand.scala | 6 +- .../scala/org/apache/ignite/visor/visor.scala | 32 +- 108 files changed, 2340 insertions(+), 2346 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java index c0fc287..be98957 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java @@ -439,7 +439,7 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract keys.add(UUID.randomUUID()); for (Object key : keys) { - UUID nodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); + UUID nodeId = grid(0).cluster().mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); UUID clientNodeId = partitioned.affinity(key); @@ -580,7 +580,7 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract for (int i = 0; i < 100; i++) { String key = "key" + i; - UUID primaryNodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); + UUID primaryNodeId = grid(0).cluster().mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); assertEquals("Affinity mismatch for key: " + key, primaryNodeId, partitioned.affinity(key)); @@ -604,7 +604,7 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract for (int i = 100; i < 200; i++) { String pinnedKey = "key" + i; - UUID primaryNodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, pinnedKey).id(); + UUID primaryNodeId = grid(0).cluster().mapKeyToNode(PARTITIONED_CACHE_NAME, pinnedKey).id(); UUID pinnedNodeId = F.first(F.view(gridsByLocNode.keySet(), F.notEqualTo(primaryNodeId))); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java index 59d9c4f..36e2b67 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java @@ -52,7 +52,7 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - assert grid(0).nodes().size() == gridCount(); + assert grid(0).cluster().nodes().size() == gridCount(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java deleted file mode 100644 index eb74712..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java +++ /dev/null @@ -1,889 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.executor.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.IgniteNodeAttributes.*; - -/** - * - */ -public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Kernal context. */ - protected transient GridKernalContext ctx; - - /** Parent projection. */ - private transient ClusterGroup parent; - - /** Compute. */ - private transient IgniteComputeImpl compute; - - /** Messaging. */ - private transient IgniteMessagingImpl messaging; - - /** Events. */ - private transient IgniteEvents evts; - - /** Services. */ - private transient IgniteServices svcs; - - /** Grid name. */ - private String gridName; - - /** Subject ID. */ - private UUID subjId; - - /** Projection predicate. */ - protected IgnitePredicate<ClusterNode> p; - - /** Node IDs. */ - private Set<UUID> ids; - - /** - * Required by {@link Externalizable}. - */ - public ClusterGroupAdapter() { - // No-op. - } - - /** - * @param parent Parent of this projection. - * @param ctx Grid kernal context. - * @param p Predicate. - */ - protected ClusterGroupAdapter(@Nullable ClusterGroup parent, - @Nullable GridKernalContext ctx, - @Nullable UUID subjId, - @Nullable IgnitePredicate<ClusterNode> p) - { - this.parent = parent; - - if (ctx != null) - setKernalContext(ctx); - - this.subjId = subjId; - this.p = p; - - ids = null; - } - - /** - * @param parent Parent of this projection. - * @param ctx Grid kernal context. - * @param ids Node IDs. - */ - protected ClusterGroupAdapter(@Nullable ClusterGroup parent, - @Nullable GridKernalContext ctx, - @Nullable UUID subjId, - Set<UUID> ids) - { - this.parent = parent; - - if (ctx != null) - setKernalContext(ctx); - - assert ids != null; - - this.subjId = subjId; - this.ids = ids; - - p = F.nodeForNodeIds(ids); - } - - /** - * @param parent Parent of this projection. - * @param ctx Grid kernal context. - * @param p Predicate. - * @param ids Node IDs. - */ - private ClusterGroupAdapter(@Nullable ClusterGroup parent, - @Nullable GridKernalContext ctx, - @Nullable UUID subjId, - @Nullable IgnitePredicate<ClusterNode> p, - Set<UUID> ids) - { - this.parent = parent; - - if (ctx != null) - setKernalContext(ctx); - - this.subjId = subjId; - this.p = p; - this.ids = ids; - - if (p == null && ids != null) - this.p = F.nodeForNodeIds(ids); - } - - /** - * <tt>ctx.gateway().readLock()</tt> - */ - protected void guard() { - assert ctx != null; - - ctx.gateway().readLock(); - } - - /** - * <tt>ctx.gateway().readUnlock()</tt> - */ - protected void unguard() { - assert ctx != null; - - ctx.gateway().readUnlock(); - } - - /** - * <tt>ctx.gateway().lightCheck()</tt> - */ - protected void lightCheck() { - assert ctx != null; - - ctx.gateway().lightCheck(); - } - - /** - * Sets kernal context. - * - * @param ctx Kernal context to set. - */ - protected void setKernalContext(GridKernalContext ctx) { - assert ctx != null; - assert this.ctx == null; - - this.ctx = ctx; - - if (parent == null) - parent = ctx.grid(); - - gridName = ctx.gridName(); - } - - /** {@inheritDoc} */ - @Override public final Ignite ignite() { - assert ctx != null; - - guard(); - - try { - return ctx.grid(); - } - finally { - unguard(); - } - } - - /** - * @return {@link org.apache.ignite.IgniteCompute} for this projection. - */ - public final IgniteCompute compute() { - if (compute == null) { - assert ctx != null; - - compute = new IgniteComputeImpl(ctx, this, subjId, false); - } - - return compute; - } - - /** - * @return {@link org.apache.ignite.IgniteMessaging} for this projection. - */ - public final IgniteMessaging message() { - if (messaging == null) { - assert ctx != null; - - messaging = new IgniteMessagingImpl(ctx, this, false); - } - - return messaging; - } - - /** - * @return {@link org.apache.ignite.IgniteEvents} for this projection. - */ - public final IgniteEvents events() { - if (evts == null) { - assert ctx != null; - - evts = new IgniteEventsImpl(ctx, this, false); - } - - return evts; - } - - /** - * @return {@link org.apache.ignite.IgniteServices} for this projection. - */ - public IgniteServices services() { - if (svcs == null) { - assert ctx != null; - - svcs = new IgniteServicesImpl(ctx, this, false); - } - - return svcs; - } - - /** - * @return {@link ExecutorService} for this projection. - */ - public ExecutorService executorService() { - assert ctx != null; - - return new GridExecutorService(this, ctx); - } - - /** {@inheritDoc} */ - @Override public final ClusterMetrics metrics() { - guard(); - - try { - if (nodes().isEmpty()) - throw U.convertException(U.emptyTopologyException()); - - return new ClusterMetricsSnapshot(this); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> nodes() { - guard(); - - try { - if (ids != null) { - if (ids.isEmpty()) - return Collections.emptyList(); - else if (ids.size() == 1) { - ClusterNode node = ctx.discovery().node(F.first(ids)); - - return node != null ? Collections.singleton(node) : Collections.<ClusterNode>emptyList(); - } - else { - Collection<ClusterNode> nodes = new ArrayList<>(ids.size()); - - for (UUID id : ids) { - ClusterNode node = ctx.discovery().node(id); - - if (node != null) - nodes.add(node); - } - - return nodes; - } - } - else { - Collection<ClusterNode> all = ctx.discovery().allNodes(); - - return p != null ? F.view(all, p) : all; - } - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public final ClusterNode node(UUID id) { - A.notNull(id, "id"); - - guard(); - - try { - if (ids != null) - return ids.contains(id) ? ctx.discovery().node(id) : null; - else { - ClusterNode node = ctx.discovery().node(id); - - return node != null && (p == null || p.apply(node)) ? node : null; - } - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode node() { - return F.first(nodes()); - } - - /** {@inheritDoc} */ - @Override public final IgnitePredicate<ClusterNode> predicate() { - return p != null ? p : F.<ClusterNode>alwaysTrue(); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forPredicate(IgnitePredicate<ClusterNode> p) { - A.notNull(p, "p"); - - guard(); - - try { - return new ClusterGroupAdapter(this, ctx, subjId, this.p != null ? F.and(p, this.p) : p); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forAttribute(String name, @Nullable final String val) { - A.notNull(name, "n"); - - return forPredicate(new AttributeFilter(name, val)); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forNode(ClusterNode node, ClusterNode... nodes) { - A.notNull(node, "node"); - - guard(); - - try { - Set<UUID> nodeIds; - - if (F.isEmpty(nodes)) - nodeIds = contains(node) ? Collections.singleton(node.id()) : Collections.<UUID>emptySet(); - else { - nodeIds = U.newHashSet(nodes.length + 1); - - for (ClusterNode n : nodes) - if (contains(n)) - nodeIds.add(n.id()); - - if (contains(node)) - nodeIds.add(node.id()); - } - - return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forNodes(Collection<? extends ClusterNode> nodes) { - A.notEmpty(nodes, "nodes"); - - guard(); - - try { - Set<UUID> nodeIds = U.newHashSet(nodes.size()); - - for (ClusterNode n : nodes) - if (contains(n)) - nodeIds.add(n.id()); - - return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forNodeId(UUID id, UUID... ids) { - A.notNull(id, "id"); - - guard(); - - try { - Set<UUID> nodeIds; - - if (F.isEmpty(ids)) - nodeIds = contains(id) ? Collections.singleton(id) : Collections.<UUID>emptySet(); - else { - nodeIds = U.newHashSet(ids.length + 1); - - for (UUID id0 : ids) { - if (contains(id)) - nodeIds.add(id0); - } - - if (contains(id)) - nodeIds.add(id); - } - - return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forNodeIds(Collection<UUID> ids) { - A.notEmpty(ids, "ids"); - - guard(); - - try { - Set<UUID> nodeIds = U.newHashSet(ids.size()); - - for (UUID id : ids) { - if (contains(id)) - nodeIds.add(id); - } - - return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forOthers(ClusterNode node, ClusterNode... nodes) { - A.notNull(node, "node"); - - return forOthers(F.concat(false, node.id(), F.nodeIds(Arrays.asList(nodes)))); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forOthers(ClusterGroup prj) { - A.notNull(prj, "prj"); - - if (ids != null) { - guard(); - - try { - Set<UUID> nodeIds = U.newHashSet(ids.size()); - - for (UUID id : ids) { - ClusterNode n = node(id); - - if (n != null && !prj.predicate().apply(n)) - nodeIds.add(id); - } - - return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); - } - finally { - unguard(); - } - } - else - return forPredicate(F.not(prj.predicate())); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forRemotes() { - return forOthers(Collections.singleton(ctx.localNodeId())); - } - - /** - * @param excludeIds Node IDs. - * @return New projection. - */ - private ClusterGroup forOthers(Collection<UUID> excludeIds) { - assert excludeIds != null; - - if (ids != null) { - guard(); - - try { - Set<UUID> nodeIds = U.newHashSet(ids.size()); - - for (UUID id : ids) { - if (!excludeIds.contains(id)) - nodeIds.add(id); - } - - return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); - } - finally { - unguard(); - } - } - else - return forPredicate(new OthersFilter(excludeIds)); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forCacheNodes(@Nullable String cacheName) { - return forPredicate(new CachesFilter(cacheName, null)); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forDataNodes(@Nullable String cacheName) { - return forPredicate(new CachesFilter(cacheName, CachesFilter.DATA_MODES)); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forClientNodes(@Nullable String cacheName) { - return forPredicate(new CachesFilter(cacheName, CachesFilter.CLIENT_MODES)); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forStreamer(@Nullable String streamerName, @Nullable String... streamerNames) { - return forPredicate(new StreamersFilter(streamerName, streamerNames)); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forCacheNodes(@Nullable String cacheName, - Set<CacheDistributionMode> distributionModes) { - return forPredicate(new CachesFilter(cacheName, distributionModes)); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forHost(ClusterNode node) { - A.notNull(node, "node"); - - String macs = node.attribute(ATTR_MACS); - - assert macs != null; - - return forAttribute(ATTR_MACS, macs); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forDaemons() { - return forPredicate(new DaemonFilter()); - } - - /** {@inheritDoc} */ - @Override public final ClusterGroup forRandom() { - return ids != null ? forNodeId(F.rand(ids)) : forNode(F.rand(nodes())); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forOldest() { - return new AgeProjection(this, true); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forYoungest() { - return new AgeProjection(this, false); - } - - /** {@inheritDoc} */ - @Override public ClusterGroupEx forSubjectId(UUID subjId) { - if (subjId == null) - return this; - - guard(); - - try { - return ids != null ? new ClusterGroupAdapter(this, ctx, subjId, ids) : - new ClusterGroupAdapter(this, ctx, subjId, p); - } - finally { - unguard(); - } - } - - /** - * @param n Node. - * @return Whether node belongs to this projection. - */ - private boolean contains(ClusterNode n) { - assert n != null; - - return ids != null ? ids.contains(n.id()) : p == null || p.apply(n); - } - - /** - * @param id Node ID. - * @return Whether node belongs to this projection. - */ - private boolean contains(UUID id) { - assert id != null; - - if (ids != null) - return ids.contains(id); - else { - ClusterNode n = ctx.discovery().node(id); - - return n != null && (p == null || p.apply(n)); - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, gridName); - U.writeUuid(out, subjId); - - out.writeBoolean(ids != null); - - if (ids != null) - out.writeObject(ids); - else - out.writeObject(p); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - gridName = U.readString(in); - subjId = U.readUuid(in); - - if (in.readBoolean()) - ids = (Set<UUID>)in.readObject(); - else - p = (IgnitePredicate<ClusterNode>)in.readObject(); - } - - /** - * Reconstructs object on unmarshalling. - * - * @return Reconstructed object. - * @throws ObjectStreamException Thrown in case of unmarshalling error. - */ - protected Object readResolve() throws ObjectStreamException { - try { - IgniteKernal g = IgnitionEx.gridx(gridName); - - return ids != null ? new ClusterGroupAdapter(g, g.context(), subjId, ids) : - p != null ? new ClusterGroupAdapter(g, g.context(), subjId, p) : g; - } - catch (IllegalStateException e) { - throw U.withCause(new InvalidObjectException(e.getMessage()), e); - } - } - - /** - */ - private static class CachesFilter implements IgnitePredicate<ClusterNode> { - /** */ - private static final Set<CacheDistributionMode> DATA_MODES = EnumSet.of(CacheDistributionMode.NEAR_PARTITIONED, - CacheDistributionMode.PARTITIONED_ONLY); - - /** */ - private static final Set<CacheDistributionMode> CLIENT_MODES = EnumSet.of(CacheDistributionMode.CLIENT_ONLY, - CacheDistributionMode.NEAR_ONLY); - - /** */ - private static final long serialVersionUID = 0L; - - /** Cache name. */ - private final String cacheName; - - /** */ - private final Set<CacheDistributionMode> distributionMode; - - /** - * @param cacheName Cache name. - * @param distributionMode Filter by {@link org.apache.ignite.configuration.CacheConfiguration#getDistributionMode()}. - */ - private CachesFilter(@Nullable String cacheName, @Nullable Set<CacheDistributionMode> distributionMode) { - this.cacheName = cacheName; - this.distributionMode = distributionMode; - } - - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode n) { - GridCacheAttributes[] caches = n.attribute(ATTR_CACHE); - - if (caches != null) { - for (GridCacheAttributes attrs : caches) { - if (Objects.equals(cacheName, attrs.cacheName()) - && (distributionMode == null || distributionMode.contains(attrs.partitionedTaxonomy()))) - return true; - } - } - - return false; - } - } - - /** - */ - private static class StreamersFilter implements IgnitePredicate<ClusterNode> { - /** */ - private static final long serialVersionUID = 0L; - - /** Streamer name. */ - private final String streamerName; - - /** Streamer names. */ - private final String[] streamerNames; - - /** - * @param streamerName Streamer name. - * @param streamerNames Streamer names. - */ - private StreamersFilter(@Nullable String streamerName, @Nullable String[] streamerNames) { - this.streamerName = streamerName; - this.streamerNames = streamerNames; - } - - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode n) { - if (!U.hasStreamer(n, streamerName)) - return false; - - if (!F.isEmpty(streamerNames)) - for (String sn : streamerNames) - if (!U.hasStreamer(n, sn)) - return false; - - return true; - } - } - - /** - */ - private static class AttributeFilter implements IgnitePredicate<ClusterNode> { - /** */ - private static final long serialVersionUID = 0L; - - /** Name. */ - private final String name; - - /** Value. */ - private final String val; - - /** - * @param name Name. - * @param val Value. - */ - private AttributeFilter(String name, String val) { - this.name = name; - this.val = val; - } - - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode n) { - return val == null ? n.attributes().containsKey(name) : val.equals(n.attribute(name)); - } - } - - /** - */ - private static class DaemonFilter implements IgnitePredicate<ClusterNode> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode n) { - return n.isDaemon(); - } - } - - /** - */ - private static class OthersFilter implements IgnitePredicate<ClusterNode> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final Collection<UUID> nodeIds; - - /** - * @param nodeIds Node IDs. - */ - private OthersFilter(Collection<UUID> nodeIds) { - this.nodeIds = nodeIds; - } - - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode n) { - return !nodeIds.contains(n.id()); - } - } - - /** - * Age-based projection. - */ - private static class AgeProjection extends ClusterGroupAdapter { - /** Serialization version. */ - private static final long serialVersionUID = 0L; - - /** Oldest flag. */ - private boolean isOldest; - - /** Selected node. */ - private volatile ClusterNode node; - - /** Last topology version. */ - private volatile long lastTopVer; - - /** - * Required for {@link Externalizable}. - */ - public AgeProjection() { - // No-op. - } - - /** - * @param prj Parent projection. - * @param isOldest Oldest flag. - */ - private AgeProjection(ClusterGroupAdapter prj, boolean isOldest) { - super(prj.parent, prj.ctx, prj.subjId, prj.p, prj.ids); - - this.isOldest = isOldest; - - reset(); - } - - /** - * Resets node. - */ - private synchronized void reset() { - guard(); - - try { - lastTopVer = ctx.discovery().topologyVersion(); - - this.node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode node() { - if (ctx.discovery().topologyVersion() != lastTopVer) - reset(); - - return node; - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> nodes() { - if (ctx.discovery().topologyVersion() != lastTopVer) - reset(); - - ClusterNode node = this.node; - - return node == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(node); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupEx.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupEx.java deleted file mode 100644 index faaab85..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupEx.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Internal projection interface. - */ -public interface ClusterGroupEx extends ClusterGroup { - /** - * Creates projection for specified subject ID. - * - * @param subjId Subject ID. - * @return Internal projection. - */ - public ClusterGroupEx forSubjectId(UUID subjId); - - /** - * @param cacheName Cache name. - * @param distributionModes Cache distribution modes. - * @return Cluster group. - */ - public ClusterGroup forCacheNodes(@Nullable String cacheName, Set<CacheDistributionMode> distributionModes); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/ClusterNodeLocalMapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterNodeLocalMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterNodeLocalMapImpl.java deleted file mode 100644 index c5378a3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterNodeLocalMapImpl.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.concurrent.*; - -/** - * - */ -public class ClusterNodeLocalMapImpl<K, V> extends ConcurrentHashMap8<K, V> implements ClusterNodeLocalMap<K, V>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private static final ThreadLocal<String> stash = new ThreadLocal<>(); - - /** */ - private GridKernalContext ctx; - - /** - * No-arg constructor is required by externalization. - */ - public ClusterNodeLocalMapImpl() { - // No-op. - } - - /** - * - * @param ctx Kernal context. - */ - ClusterNodeLocalMapImpl(GridKernalContext ctx) { - assert ctx != null; - - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Nullable @Override public V addIfAbsent(K key, @Nullable Callable<V> dflt) { - return F.addIfAbsent(this, key, dflt); - } - - /** {@inheritDoc} */ - @Override public V addIfAbsent(K key, V val) { - return F.addIfAbsent(this, key, val); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, ctx.gridName()); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - stash.set(U.readString(in)); - } - - /** - * Reconstructs object on unmarshalling. - * - * @return Reconstructed object. - * @throws ObjectStreamException Thrown in case of unmarshalling error. - */ - protected Object readResolve() throws ObjectStreamException { - try { - return IgnitionEx.gridx(stash.get()).nodeLocalMap(); - } - catch (IllegalStateException e) { - throw U.withCause(new InvalidObjectException(e.getMessage()), e); - } - finally { - stash.remove(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ClusterNodeLocalMapImpl.class, this); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/GridKillTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKillTask.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKillTask.java deleted file mode 100644 index 743c0d6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKillTask.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.task.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -import static org.apache.ignite.internal.IgniteNodeAttributes.*; - -/** - * Special kill task that never fails over jobs. - */ -@GridInternal -class GridKillTask extends ComputeTaskAdapter<Boolean, Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** Restart flag. */ - private boolean restart; - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Boolean restart) { - assert restart != null; - - this.restart = restart; - - Map<ComputeJob, ClusterNode> jobs = U.newHashMap(subgrid.size()); - - for (ClusterNode n : subgrid) - if (!daemon(n)) - jobs.put(new GridKillJob(), n); - - return jobs; - } - - /** - * Checks if given node is a daemon node. - * - * @param n Node. - * @return Whether node is daemon. - */ - private boolean daemon(ClusterNode n) { - return "true".equalsIgnoreCase(n.<String>attribute(ATTR_DAEMON)); - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { - return ComputeJobResultPolicy.WAIT; - } - - /** {@inheritDoc} */ - @Override public Void reduce(List<ComputeJobResult> results) { - return null; - } - - /** - * Kill job. - */ - private class GridKillJob extends ComputeJobAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public Object execute() { - if (restart) - new Thread(new Runnable() { - @Override public void run() { - G.restart(true); - } - }, - "ignite-restarter").start(); - else - new Thread(new Runnable() { - @Override public void run() { - G.kill(true); - } - }, - "ignite-stopper").start(); - - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java deleted file mode 100644 index cc28113..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * - */ -public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> implements IgniteCluster { - /** */ - private final IgniteKernal grid; - - /** - * @param grid Grid. - */ - public IgniteClusterAsyncImpl(IgniteKernal grid) { - super(true); - - this.grid = grid; - } - - /** {@inheritDoc} */ - @Override public ClusterNode localNode() { - return grid.localNode(); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forLocal() { - return grid.forLocal(); - } - - /** {@inheritDoc} */ - @Override public <K, V> ClusterNodeLocalMap<K, V> nodeLocalMap() { - return grid.nodeLocalMap(); - } - - /** {@inheritDoc} */ - @Override public boolean pingNode(UUID nodeId) { - return grid.pingNode(nodeId); - } - - /** {@inheritDoc} */ - @Override public long topologyVersion() { - return grid.topologyVersion(); - } - - /** {@inheritDoc} */ - @Nullable @Override public Collection<ClusterNode> topology(long topVer) { - return grid.topology(topVer); - } - - /** {@inheritDoc} */ - @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName, - @Nullable Collection<? extends K> keys) { - return grid.mapKeysToNodes(cacheName, keys); - } - - /** {@inheritDoc} */ - @Nullable @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) { - return grid.mapKeyToNode(cacheName, key); - } - - /** {@inheritDoc} */ - @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file, - boolean restart, - int timeout, - int maxConn) - { - try { - return saveOrGet(grid.startNodesAsync(file, restart, timeout, maxConn)); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public Collection<GridTuple3<String, Boolean, String>> startNodes( - Collection<Map<String, Object>> hosts, - @Nullable Map<String, Object> dflts, - boolean restart, - int timeout, - int maxConn) - { - try { - return saveOrGet(grid.startNodesAsync(hosts, dflts, restart, timeout, maxConn)); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public void stopNodes() { - grid.stopNodes(); - } - - /** {@inheritDoc} */ - @Override public void stopNodes(Collection<UUID> ids) { - grid.stopNodes(ids); - } - - /** {@inheritDoc} */ - @Override public void restartNodes() { - grid.restartNodes(); - } - - /** {@inheritDoc} */ - @Override public void restartNodes(Collection<UUID> ids) { - grid.restartNodes(ids); - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() { - grid.resetMetrics(); - } - - /** {@inheritDoc} */ - @Override public Ignite ignite() { - return grid.ignite(); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forNodes(Collection<? extends ClusterNode> nodes) { - return grid.forNodes(nodes); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forNode(ClusterNode node, ClusterNode... nodes) { - return grid.forNode(node, nodes); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forOthers(ClusterNode node, ClusterNode... nodes) { - return grid.forOthers(node, nodes); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forOthers(ClusterGroup prj) { - return grid.forOthers(prj); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forNodeIds(Collection<UUID> ids) { - return grid.forNodeIds(ids); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forNodeId(UUID id, UUID... ids) { - return grid.forNodeId(id, ids); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forPredicate(IgnitePredicate<ClusterNode> p) { - return grid.forPredicate(p); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forAttribute(String name, @Nullable String val) { - return grid.forAttribute(name, val); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forCacheNodes(String cacheName) { - return grid.forCacheNodes(cacheName); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forDataNodes(String cacheName) { - return grid.forDataNodes(cacheName); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forClientNodes(String cacheName) { - return grid.forClientNodes(cacheName); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) { - return grid.forStreamer(streamerName, streamerNames); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forRemotes() { - return grid.forRemotes(); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forHost(ClusterNode node) { - return grid.forHost(node); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forDaemons() { - return grid.forDaemons(); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forRandom() { - return grid.forRandom(); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forOldest() { - return grid.forOldest(); - } - - /** {@inheritDoc} */ - @Override public ClusterGroup forYoungest() { - return grid.forYoungest(); - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> nodes() { - return grid.nodes(); - } - - /** {@inheritDoc} */ - @Nullable @Override public ClusterNode node(UUID id) { - return grid.node(id); - } - - /** {@inheritDoc} */ - @Nullable @Override public ClusterNode node() { - return grid.node(); - } - - /** {@inheritDoc} */ - @Override public IgnitePredicate<ClusterNode> predicate() { - return grid.predicate(); - } - - /** {@inheritDoc} */ - @Override public ClusterMetrics metrics() { - return grid.metrics(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java index 373601c..85fefa5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java index 9e9f84f..c27be6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java index 19d2ff8..3c35a08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.lang.*; @@ -29,7 +31,7 @@ import java.util.*; /** * Extended Grid interface which provides some additional methods required for kernal and Visor. */ -public interface IgniteEx extends Ignite, ClusterGroupEx, IgniteCluster { +public interface IgniteEx extends Ignite { /** * Gets utility cache. * @@ -124,10 +126,20 @@ public interface IgniteEx extends Ignite, ClusterGroupEx, IgniteCluster { */ public GridHadoop hadoop(); + /** {@inheritDoc} */ + @Override IgniteClusterEx cluster(); + /** * Get latest version in string form. * * @return Latest version. */ @Nullable public String latestVersion(); + + /** + * Gets local grid node. + * + * @return Local grid node. + */ + public ClusterNode localNode(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 44e92d8..caea468 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.*; import org.apache.ignite.internal.managers.checkpoint.*; import org.apache.ignite.internal.managers.collision.*; @@ -60,9 +61,7 @@ import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.nodestart.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -79,7 +78,6 @@ import javax.management.*; import java.io.*; import java.lang.management.*; import java.lang.reflect.*; -import java.net.*; import java.text.*; import java.util.*; import java.util.concurrent.*; @@ -87,10 +85,9 @@ import java.util.concurrent.atomic.*; import static org.apache.ignite.IgniteSystemProperties.*; import static org.apache.ignite.internal.GridKernalState.*; -import static org.apache.ignite.internal.IgniteVersionUtils.*; import static org.apache.ignite.internal.IgniteComponentType.*; import static org.apache.ignite.internal.IgniteNodeAttributes.*; -import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*; +import static org.apache.ignite.internal.IgniteVersionUtils.*; import static org.apache.ignite.lifecycle.LifecycleEventType.*; /** @@ -99,7 +96,7 @@ import static org.apache.ignite.lifecycle.LifecycleEventType.*; * See <a href="http://en.wikipedia.org/wiki/Kernal">http://en.wikipedia.org/wiki/Kernal</a> for information on the * misspelling. */ -public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, IgniteMXBean { +public class IgniteKernal implements IgniteEx, IgniteMXBean { /** */ private static final long serialVersionUID = 0L; @@ -122,6 +119,10 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit private static final long PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30; /** */ + @GridToStringExclude + private GridKernalContextImpl ctx; + + /** */ private IgniteConfiguration cfg; /** */ @@ -182,9 +183,9 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit @GridToStringExclude private boolean errOnStop; - /** Node local store. */ + /** Cluster. */ @GridToStringExclude - private ClusterNodeLocalMap nodeLoc; + private IgniteClusterImpl cluster; /** Scheduler. */ @GridToStringExclude @@ -217,8 +218,6 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit * @param rsrcCtx Optional Spring application context. */ public IgniteKernal(@Nullable GridSpringResourceContext rsrcCtx) { - super(null, null, null, (IgnitePredicate<ClusterNode>)null); - this.rsrcCtx = rsrcCtx; String[] compatibleVers = COMPATIBLE_VERS.split(","); @@ -230,13 +229,43 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit } /** {@inheritDoc} */ - @Override public IgniteCluster cluster() { - return this; + @Override public IgniteClusterEx cluster() { + return cluster; + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return cluster.localNode(); + } + + /** {@inheritDoc} */ + @Override public IgniteCompute compute() { + return cluster.compute(); + } + + /** {@inheritDoc} */ + @Override public IgniteMessaging message() { + return cluster.message(); + } + + /** {@inheritDoc} */ + @Override public IgniteEvents events() { + return cluster.events(); + } + + /** {@inheritDoc} */ + @Override public IgniteServices services() { + return cluster.services(); + } + + /** {@inheritDoc} */ + @Override public ExecutorService executorService() { + return cluster.executorService(); } /** {@inheritDoc} */ @Override public final IgniteCompute compute(ClusterGroup grp) { - return ((ClusterGroupAdapter) grp).compute(); + return ((ClusterGroupAdapter)grp).compute(); } /** {@inheritDoc} */ @@ -246,17 +275,17 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit /** {@inheritDoc} */ @Override public final IgniteEvents events(ClusterGroup grp) { - return ((ClusterGroupAdapter) grp).events(); + return ((ClusterGroupAdapter)grp).events(); } /** {@inheritDoc} */ @Override public IgniteServices services(ClusterGroup grp) { - return ((ClusterGroupAdapter) grp).services(); + return ((ClusterGroupAdapter)grp).services(); } /** {@inheritDoc} */ @Override public ExecutorService executorService(ClusterGroup grp) { - return ((ClusterGroupAdapter) grp).executorService(); + return ((ClusterGroupAdapter)grp).executorService(); } /** {@inheritDoc} */ @@ -503,20 +532,29 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit } /** - * @param cfg Grid configuration to use. + * @param cfg Configuration to use. * @param utilityCachePool Utility cache pool. - * @param execSvc - * @param sysExecSvc - * @param p2pExecSvc - * @param mgmtExecSvc - * @param igfsExecSvc - * @param errHnd Error handler to use for notification about startup problems. @throws IgniteCheckedException Thrown in case of any errors. + * @param execSvc Executor service. + * @param sysExecSvc System executor service. + * @param p2pExecSvc P2P executor service. + * @param mgmtExecSvc Management executor service. + * @param igfsExecSvc IGFS executor service. + * @param restExecSvc Reset executor service. + * @param errHnd Error handler to use for notification about startup problems. + * @throws IgniteCheckedException Thrown in case of any errors. */ @SuppressWarnings({"CatchGenericClass", "unchecked"}) - public void start(final IgniteConfiguration cfg, ExecutorService utilityCachePool, final ExecutorService execSvc, - final ExecutorService sysExecSvc, ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, - ExecutorService igfsExecSvc, ExecutorService restExecSvc, GridAbsClosure errHnd) - throws IgniteCheckedException { + public void start(final IgniteConfiguration cfg, + ExecutorService utilityCachePool, + final ExecutorService execSvc, + final ExecutorService sysExecSvc, + ExecutorService p2pExecSvc, + ExecutorService mgmtExecSvc, + ExecutorService igfsExecSvc, + ExecutorService restExecSvc, + GridAbsClosure errHnd) + throws IgniteCheckedException + { gw.compareAndSet(null, new GridKernalGatewayImpl(cfg.getGridName())); GridKernalGateway gw = this.gw.get(); @@ -623,16 +661,23 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit // Spin out SPIs & managers. try { - GridKernalContextImpl ctx = - new GridKernalContextImpl(log, this, cfg, gw, utilityCachePool, execSvc, sysExecSvc, p2pExecSvc, - mgmtExecSvc, igfsExecSvc, restExecSvc); + ctx = new GridKernalContextImpl(log, + this, + cfg, + gw, + utilityCachePool, + execSvc, + sysExecSvc, + p2pExecSvc, + mgmtExecSvc, + igfsExecSvc, + restExecSvc); - nodeLoc = new ClusterNodeLocalMapImpl(ctx); + cluster = new IgniteClusterImpl(); - U.onGridStart(); + cluster.setKernalContext(ctx); - // Set context into rich adapter. - setKernalContext(ctx); + U.onGridStart(); // Start and configure resource processor first as it contains resources used // by all other managers and processors. @@ -645,10 +690,12 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit startProcessor(ctx, rsrcProc, attrs); // Inject resources into lifecycle beans. - if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) - for (LifecycleBean bean : cfg.getLifecycleBeans()) + if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) { + for (LifecycleBean bean : cfg.getLifecycleBeans()) { if (bean != null) rsrcProc.inject(bean); + } + } // Lifecycle notification. notifyLifecycleBeans(BEFORE_GRID_START); @@ -806,7 +853,7 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit // Setup periodic version check. updateNtfTimer.scheduleAtFixedRate(new GridTimerTask() { @Override public void safeRun() throws InterruptedException { - verChecker0.topologySize(nodes().size()); + verChecker0.topologySize(cluster().nodes().size()); verChecker0.checkForNewVersion(execSvc, log); @@ -867,7 +914,7 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit @Override protected void safeRun() { if (log.isInfoEnabled()) { - ClusterMetrics m = localNode().metrics(); + ClusterMetrics m = cluster().localNode().metrics(); double cpuLoadPct = m.getCurrentCpuLoad() * 100; double avgCpuLoadPct = m.getAverageCpuLoad() * 100; @@ -886,9 +933,9 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit int cpus = 0; try { - ClusterMetrics metrics = metrics(); + ClusterMetrics metrics = cluster().metrics(); - Collection<ClusterNode> nodes0 = nodes(); + Collection<ClusterNode> nodes0 = cluster().nodes(); hosts = U.neighborhood(nodes0).size(); nodes = nodes0.size(); @@ -1525,9 +1572,11 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit * @param rtBean Java runtime bean. */ private void ackStart(RuntimeMXBean rtBean) { + ClusterNode locNode = localNode(); + if (log.isQuiet()) { U.quiet(false, ""); - U.quiet(false, "Ignite node started OK (id=" + U.id8(localNode().id()) + + U.quiet(false, "Ignite node started OK (id=" + U.id8(locNode.id()) + (F.isEmpty(gridName) ? "" : ", grid=" + gridName) + ')'); } @@ -1549,15 +1598,15 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit ">>> " + ack + NL + ">>> " + dash + NL + ">>> OS name: " + U.osString() + NL + - ">>> CPU(s): " + localNode().metrics().getTotalCpus() + NL + - ">>> Heap: " + U.heapSize(localNode(), 2) + "GB" + NL + + ">>> CPU(s): " + locNode.metrics().getTotalCpus() + NL + + ">>> Heap: " + U.heapSize(locNode, 2) + "GB" + NL + ">>> VM name: " + rtBean.getName() + NL + ">>> Grid name: " + gridName + NL + ">>> Local node [" + - "ID=" + localNode().id().toString().toUpperCase() + - ", order=" + localNode().order() + + "ID=" + locNode.id().toString().toUpperCase() + + ", order=" + locNode.order() + "]" + NL + - ">>> Local node addresses: " + U.addressesAsString(localNode()) + NL + + ">>> Local node addresses: " + U.addressesAsString(locNode) + NL + ">>> Local ports: " + sb + NL; str += ">>> Ignite documentation: http://" + SITE + "/documentation" + NL; @@ -1660,9 +1709,6 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit * @param cancel Whether or not to cancel running jobs. */ private void stop0(boolean cancel) { - String nid = getLocalNodeId().toString().toUpperCase(); - String nid8 = U.id8(getLocalNodeId()).toUpperCase(); - gw.compareAndSet(null, new GridKernalGatewayImpl(gridName)); GridKernalGateway gw = this.gw.get(); @@ -1742,11 +1788,13 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit try { assert gw.getState() == STARTED || gw.getState() == STARTING; + ClusterNodeLocalMap locMap = cluster.nodeLocalMap(); + // No more kernal calls from this point on. gw.setState(STOPPING); // Clear node local store. - nodeLoc.clear(); + locMap.clear(); if (log.isDebugEnabled()) log.debug("Grid " + (gridName == null ? "" : '\'' + gridName + "' ") + "is stopping."); @@ -2132,31 +2180,7 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit @Override public boolean pingNode(String nodeId) { A.notNull(nodeId, "nodeId"); - return pingNode(UUID.fromString(nodeId)); - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> topology(long topVer) { - guard(); - - try { - return ctx.discovery().topology(topVer); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public long topologyVersion() { - guard(); - - try { - return ctx.discovery().topologyVersion(); - } - finally { - unguard(); - } + return cluster().pingNode(UUID.fromString(nodeId)); } /** {@inheritDoc} */ @@ -2187,7 +2211,7 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit guard(); try { - for (ClusterNode n : nodes()) + for (ClusterNode n : cluster().nodes()) if (n.addresses().contains(host)) return ctx.discovery().pingNode(n.id()); @@ -2199,341 +2223,6 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit } /** {@inheritDoc} */ - @Override public ClusterNode localNode() { - guard(); - - try { - ClusterNode node = ctx.discovery().localNode(); - - assert node != null; - - return node; - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public <K, V> ClusterNodeLocalMap<K, V> nodeLocalMap() { - guard(); - - try { - return nodeLoc; - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public boolean pingNode(UUID nodeId) { - A.notNull(nodeId, "nodeId"); - - guard(); - - try { - return ctx.discovery().pingNode(nodeId); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file, - boolean restart, - int timeout, - int maxConn) { - try { - return startNodesAsync(file, restart, timeout, maxConn).get(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** - * @param file Configuration file. - * @param restart Whether to stop existing nodes. - * @param timeout Connection timeout. - * @param maxConn Number of parallel SSH connections to one host. - * @return Future with results. - * @see {@link IgniteCluster#startNodes(java.io.File, boolean, int, int)}. - */ - IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file, - boolean restart, - int timeout, - int maxConn) - { - A.notNull(file, "file"); - A.ensure(file.exists(), "file doesn't exist."); - A.ensure(file.isFile(), "file is a directory."); - - try { - IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file); - - return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); - } - } - - /** {@inheritDoc} */ - @Override public IgniteCluster withAsync() { - return new IgniteClusterAsyncImpl(this); - } - - /** {@inheritDoc} */ - @Override public boolean isAsync() { - return false; - } - - /** {@inheritDoc} */ - @Override public <R> IgniteFuture<R> future() { - throw new IllegalStateException("Asynchronous mode is not enabled."); - } - - /** {@inheritDoc} */ - @Override public Collection<GridTuple3<String, Boolean, String>> startNodes( - Collection<Map<String, Object>> hosts, - @Nullable Map<String, Object> dflts, - boolean restart, - int timeout, - int maxConn) - { - try { - return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** - * @param hosts Startup parameters. - * @param dflts Default values. - * @param restart Whether to stop existing nodes - * @param timeout Connection timeout in milliseconds. - * @param maxConn Number of parallel SSH connections to one host. - * @return Future with results. - * @see {@link IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}. - */ - IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync( - Collection<Map<String, Object>> hosts, - @Nullable Map<String, Object> dflts, - boolean restart, - int timeout, - int maxConn) - { - A.notNull(hosts, "hosts"); - - guard(); - - try { - IgniteSshProcessor sshProcessor = IgniteComponentType.SSH.create(false); - - Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts); - - Map<String, ConcurrentLinkedQueue<IgniteNodeCallable>> runMap = new HashMap<>(); - - int nodeCallCnt = 0; - - for (String host : specsMap.keySet()) { - InetAddress addr; - - try { - addr = InetAddress.getByName(host); - } - catch (UnknownHostException e) { - throw new IgniteCheckedException("Invalid host name: " + host, e); - } - - Collection<? extends ClusterNode> neighbors = null; - - if (addr.isLoopbackAddress()) - neighbors = neighbors(); - else { - for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) { - ClusterNode node = F.first(p); - - if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) { - neighbors = p; - - break; - } - } - } - - int startIdx = 1; - - if (neighbors != null) { - if (restart && !neighbors.isEmpty()) { - try { - compute(forNodes(neighbors)).execute(GridKillTask.class, false); - } - catch (ClusterGroupEmptyException ignored) { - // No-op, nothing to restart. - } - } - else - startIdx = neighbors.size() + 1; - } - - ConcurrentLinkedQueue<IgniteNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>(); - - runMap.put(host, nodeRuns); - - for (IgniteRemoteStartSpecification spec : specsMap.get(host)) { - assert spec.host().equals(host); - - for (int i = startIdx; i <= spec.nodes(); i++) { - nodeRuns.add(sshProcessor.nodeStartCallable(spec, timeout)); - - nodeCallCnt++; - } - } - } - - // If there is nothing to start, return finished future with empty result. - if (nodeCallCnt == 0) - return new GridFinishedFuture<Collection<GridTuple3<String, Boolean, String>>>( - ctx, Collections.<GridTuple3<String, Boolean, String>>emptyList()); - - // Exceeding max line width for readability. - GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>> - fut = new GridCompoundFuture<>( - ctx, - CU.<GridTuple3<String, Boolean, String>>objectsReducer() - ); - - AtomicInteger cnt = new AtomicInteger(nodeCallCnt); - - // Limit maximum simultaneous connection number per host. - for (ConcurrentLinkedQueue<IgniteNodeCallable> queue : runMap.values()) { - for (int i = 0; i < maxConn; i++) { - if (!runNextNodeCallable(queue, fut, cnt)) - break; - } - } - - return fut; - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); - } - finally { - unguard(); - } - } - - /** - * Gets the all grid nodes that reside on the same physical computer as local grid node. - * Local grid node is excluded. - * <p> - * Detection of the same physical computer is based on comparing set of network interface MACs. - * If two nodes have the same set of MACs, Ignite considers these nodes running on the same - * physical computer. - * @return Grid nodes that reside on the same physical computer as local grid node. - */ - private Collection<ClusterNode> neighbors() { - Collection<ClusterNode> neighbors = new ArrayList<>(1); - - String macs = localNode().attribute(ATTR_MACS); - - assert macs != null; - - for (ClusterNode n : forOthers(localNode()).nodes()) { - if (macs.equals(n.attribute(ATTR_MACS))) - neighbors.add(n); - } - - return neighbors; - } - - /** - * Runs next callable from host node start queue. - * - * @param queue Queue of tasks to poll from. - * @param comp Compound future that comprise all started node tasks. - * @param cnt Atomic counter to check if all futures are added to compound future. - * @return {@code True} if task was started, {@code false} if queue was empty. - */ - private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallable> queue, - final GridCompoundFuture<GridTuple3<String, Boolean, String>, - Collection<GridTuple3<String, Boolean, String>>> comp, final AtomicInteger cnt) { - IgniteNodeCallable call = queue.poll(); - - if (call == null) - return false; - - IgniteInternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true); - - comp.add(fut); - - if (cnt.decrementAndGet() == 0) - comp.markInitialized(); - - fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() { - @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) { - runNextNodeCallable(queue, comp, cnt); - } - }); - - return true; - } - - /** {@inheritDoc} */ - @Override public void stopNodes() { - guard(); - - try { - compute().execute(GridKillTask.class, false); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public void stopNodes(Collection<UUID> ids) { - guard(); - - try { - compute(forNodeIds(ids)).execute(GridKillTask.class, false); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public void restartNodes() { - guard(); - - try { - compute().execute(GridKillTask.class, true); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public void restartNodes(Collection<UUID> ids) { - guard(); - - try { - compute(forNodeIds(ids)).execute(GridKillTask.class, true); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ @Override public boolean eventUserRecordable(int type) { guard(); @@ -2739,56 +2428,6 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit } /** {@inheritDoc} */ - @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(String cacheName, - @Nullable Collection<? extends K> keys) { - if (F.isEmpty(keys)) - return Collections.emptyMap(); - - guard(); - - try { - return ctx.affinity().mapKeysToNodes(cacheName, keys); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public <K> ClusterNode mapKeyToNode(String cacheName, K key) { - A.notNull(key, "key"); - - guard(); - - try { - return ctx.affinity().mapKeyToNode(cacheName, key); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() { - guard(); - - try { - ctx.jobMetric().reset(); - ctx.io().resetMetrics(); - ctx.task().resetMetrics(); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ @Override public IgniteStreamer streamer(@Nullable String name) { guard(); @@ -2813,17 +2452,6 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit } /** {@inheritDoc} */ - @Override public ClusterGroup forLocal() { - ctx.gateway().readLock(); - - try { - return new ClusterGroupAdapter(this, ctx, null, Collections.singleton(cfg.getNodeId())); - } - finally { - ctx.gateway().readUnlock(); - } - } - /** {@inheritDoc} */ @Override public IgniteProductVersion version() { return VER; } @@ -2987,6 +2615,24 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit } /** + * <tt>ctx.gateway().readLock()</tt> + */ + private void guard() { + assert ctx != null; + + ctx.gateway().readLock(); + } + + /** + * <tt>ctx.gateway().readUnlock()</tt> + */ + private void unguard() { + assert ctx != null; + + ctx.gateway().readUnlock(); + } + + /** * Creates optional component. * * @param cls Component interface. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index 3165f78..d6643a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a67b72ca/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java index 81d8d1e..f0f5a70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.services.*; import org.jetbrains.annotations.*;