http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 505204d..f33fa39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -20,9 +20,9 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.interop.*; import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.continuous.*; @@ -131,41 +131,91 @@ class GridEventConsumeHandler implements GridContinuousHandler { final boolean loc = nodeId.equals(ctx.localNodeId()); lsnr = new GridLocalEventListener() { + /** node ID, routine ID, event */ + private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>(); + + private boolean notificationInProgress; + @Override public void onEvent(Event evt) { - if (filter == null || filter.apply(evt)) { - if (loc) { - if (!cb.apply(nodeId, evt)) - ctx.continuous().stopRoutine(routineId); - } - else { - GridDiscoveryManager disco = ctx.discovery(); + if (filter != null && !filter.apply(evt)) + return; + + if (loc) { + if (!cb.apply(nodeId, evt)) + ctx.continuous().stopRoutine(routineId); + } + else { + if (ctx.discovery().node(nodeId) == null) + return; + + synchronized (notificationQueue) { + notificationQueue.add(new T3<>(nodeId, routineId, evt)); + + if (!notificationInProgress) { + ctx.getSystemExecutorService().submit(new Runnable() { + @Override public void run() { + if (!ctx.continuous().lockStopping()) + return; - ClusterNode node = disco.node(nodeId); + try { + while (true) { + T3<UUID, UUID, Event> t3; - if (node != null) { - try { - EventWrapper wrapper = new EventWrapper(evt); + synchronized (notificationQueue) { + t3 = notificationQueue.poll(); - if (evt instanceof CacheEvent) { - String cacheName = ((CacheEvent)evt).cacheName(); + if (t3 == null) { + notificationInProgress = false; - if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) { - wrapper.p2pMarshal(ctx.config().getMarshaller()); + return; + } + } - wrapper.cacheName = cacheName; + try { + Event evt = t3.get3(); - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); + EventWrapper wrapper = new EventWrapper(evt); - depMgr.prepare(wrapper); + if (evt instanceof CacheEvent) { + String cacheName = ((CacheEvent)evt).cacheName(); + + ClusterNode node = ctx.discovery().node(t3.get1()); + + if (node == null) + continue; + + if (ctx.config().isPeerClassLoadingEnabled() + && ctx.discovery().cacheNode(node, cacheName)) { + wrapper.p2pMarshal(ctx.config().getMarshaller()); + + wrapper.cacheName = cacheName; + + GridCacheDeploymentManager depMgr = ctx.cache() + .internalCache(cacheName).context().deploy(); + + depMgr.prepare(wrapper); + } + } + + ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false, + false); + } + catch (ClusterTopologyCheckedException ignored) { + // No-op. + } + catch (Throwable e) { + U.error(ctx.log(GridEventConsumeHandler.class), + "Failed to send event notification to node: " + nodeId, e); + } + } + } + finally { + ctx.continuous().unlockStopping(); } } + }); - ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false); - } - catch (IgniteCheckedException e) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e); - } + notificationInProgress = true; } } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index ad7d562..d6542f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -552,4 +552,9 @@ public interface GridKernalContext extends Iterable<GridComponent> { * @return Marshaller context. */ public MarshallerContextImpl marshallerContext(); + + /** + * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). + */ + public boolean clientNode(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 1ff483e..f921d49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -894,6 +894,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public boolean clientNode() { + return cfg.isClientMode(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index c4b93b8..4f5e365 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.datastructures.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.job.*; import org.apache.ignite.internal.processors.jobmetrics.*; +import org.apache.ignite.internal.processors.nodevalidation.*; import org.apache.ignite.internal.processors.offheap.*; import org.apache.ignite.internal.processors.plugin.*; import org.apache.ignite.internal.processors.port.*; @@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.processors.segmentation.*; import org.apache.ignite.internal.processors.service.*; import org.apache.ignite.internal.processors.session.*; -import org.apache.ignite.internal.processors.nodevalidation.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -169,11 +169,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** */ @GridToStringExclude - private Timer starveTimer; + private GridTimeoutProcessor.CancelableTask starveTask; /** */ @GridToStringExclude - private Timer metricsLogTimer; + private GridTimeoutProcessor.CancelableTask metricsLogTask; /** Indicate error on grid stop. */ @GridToStringExclude @@ -867,13 +867,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (starveCheck) { final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr); - starveTimer = new Timer("ignite-starvation-checker"); - - starveTimer.scheduleAtFixedRate(new GridTimerTask() { + starveTask = ctx.timeout().schedule(new Runnable() { /** Last completed task count. */ private long lastCompletedCnt; - @Override protected void safeRun() { + @Override public void run() { if (!(execSvc instanceof ThreadPoolExecutor)) return; @@ -896,13 +894,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { long metricsLogFreq = cfg.getMetricsLogFrequency(); if (metricsLogFreq > 0) { - metricsLogTimer = new Timer("ignite-metrics-logger"); - - metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() { - /** */ + metricsLogTask = ctx.timeout().schedule(new Runnable() { private final DecimalFormat dblFmt = new DecimalFormat("#.##"); - @Override protected void safeRun() { + @Override public void run() { if (log.isInfoEnabled()) { ClusterMetrics m = cluster().localNode().metrics(); @@ -963,8 +958,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { sysPoolQSize = exec.getQueue().size(); } + String id = U.id8(localNode().id()); + String msg = NL + "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL + + " ^-- Node [id=" + id + ", name=" + name() + "]" + NL + " ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL + " ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" + dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL + @@ -1165,6 +1163,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { add(ATTR_CLIENT_MODE, cfg.isClientMode()); + add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)); + // Build a string from JVM arguments, because parameters with spaces are split. SB jvmArgs = new SB(512); @@ -1550,7 +1550,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ">>> Grid name: " + gridName + NL + ">>> Local node [" + "ID=" + locNode.id().toString().toUpperCase() + - ", order=" + locNode.order() + + ", order=" + locNode.order() + ", clientMode=" + ctx.clientNode() + "]" + NL + ">>> Local node addresses: " + U.addressesAsString(locNode) + NL + ">>> Local ports: " + sb + NL; @@ -1713,12 +1713,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (updateNtfTimer != null) updateNtfTimer.cancel(); - if (starveTimer != null) - starveTimer.cancel(); + if (starveTask != null) + starveTask.close(); - // Cancel metrics log timer. - if (metricsLogTimer != null) - metricsLogTimer.cancel(); + if (metricsLogTask != null) + metricsLogTask.close(); boolean interrupted = false; @@ -2370,7 +2369,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true).get(); - return ctx.cache().publicJCache(cacheName); + IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName); + + checkNearCacheStarted(cache); + + return cache; } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -2397,7 +2400,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get(); } - return ctx.cache().publicJCache(cacheName); + IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName); + + checkNearCacheStarted(cache); + + return cache; } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -2407,6 +2414,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } + /** + * @param cache Cache. + */ + private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) { + if (!cache.context().isNear()) + throw new IgniteException("Failed to start near cache " + + "(a cache with the same name without near cache is already started)"); + } + /** {@inheritDoc} */ @Override public void destroyCache(String cacheName) { guard(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 98cc3a7..928db5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -126,9 +126,12 @@ public final class IgniteNodeAttributes { /** Security subject for authenticated node. */ public static final String ATTR_SECURITY_SUBJECT = ATTR_PREFIX + ".security.subject"; - /** Cache interceptors. */ + /** Client mode flag. */ public static final String ATTR_CLIENT_MODE = ATTR_PREFIX + ".cache.client"; + /** Configuration consistency check disabled flag. */ + public static final String ATTR_CONSISTENCY_CHECK_SKIPPED = ATTR_PREFIX + ".consistency.check.skipped"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index d54e06f..5cbe377 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -121,11 +121,7 @@ public class IgnitionEx { }; /** */ - private static ThreadLocal<Boolean> clientMode = new ThreadLocal<Boolean>() { - @Override protected Boolean initialValue() { - return null; - } - }; + private static ThreadLocal<Boolean> clientMode = new ThreadLocal<>(); /** * Checks runtime version to be 1.7.x or 1.8.x. @@ -196,7 +192,7 @@ public class IgnitionEx { * @return Client mode flag. */ public static boolean isClientMode() { - return clientMode.get(); + return clientMode.get() == null ? false : clientMode.get(); } /** @@ -1458,8 +1454,9 @@ public class IgnitionEx { DFLT_PUBLIC_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP)); - // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads(); + if (!myCfg.isClientMode()) + // Pre-start all threads as they are guaranteed to be needed. + ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads(); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. @@ -1471,7 +1468,7 @@ public class IgnitionEx { new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads(); + ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads(); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. @@ -1764,20 +1761,14 @@ public class IgnitionEx { public void initializeDefaultCacheConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException { List<CacheConfiguration> cacheCfgs = new ArrayList<>(); - boolean clientDisco = cfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi; - - // Add marshaller and utility caches. - if (!clientDisco) { - cacheCfgs.add(marshallerSystemCache()); + cacheCfgs.add(marshallerSystemCache()); - cacheCfgs.add(utilitySystemCache()); - } + cacheCfgs.add(utilitySystemCache()); if (IgniteComponentType.HADOOP.inClassPath()) cacheCfgs.add(CU.hadoopSystemCache()); - if (cfg.getAtomicConfiguration() != null && !clientDisco) - cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration())); + cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration())); CacheConfiguration[] userCaches = cfg.getCacheConfiguration(); @@ -1854,7 +1845,7 @@ public class IgnitionEx { if (cfg.getSwapSpaceSpi() == null) { boolean needSwap = false; - if (cfg.getCacheConfiguration() != null) { + if (cfg.getCacheConfiguration() != null && !Boolean.TRUE.equals(cfg.isClientMode())) { for (CacheConfiguration c : cfg.getCacheConfiguration()) { if (c.isSwapEnabled()) { needSwap = true; @@ -2005,7 +1996,6 @@ public class IgnitionEx { ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setCacheMode(cfg.getCacheMode()); ccfg.setNodeFilter(CacheConfiguration.ALL_NODES); - ccfg.setNearConfiguration(new NearCacheConfiguration()); if (cfg.getCacheMode() == PARTITIONED) ccfg.setBackups(cfg.getBackups()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java index e7ee2ff..2ec9825 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.*; import org.jsr166.*; @@ -52,10 +53,29 @@ public abstract class MarshallerContextAdapter implements MarshallerContext { Enumeration<URL> urls = ldr.getResources(CLS_NAMES_FILE); - while (urls.hasMoreElements()) + boolean foundClsNames = false; + + while (urls.hasMoreElements()) { processResource(urls.nextElement()); - processResource(ldr.getResource(JDK_CLS_NAMES_FILE)); + foundClsNames = true; + } + + if (!foundClsNames) + throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " + + "[file=" + CLS_NAMES_FILE + ", ldr=" + ldr + ']'); + + URL jdkClsNames = ldr.getResource(JDK_CLS_NAMES_FILE); + + if (jdkClsNames == null) + throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " + + "[file=" + JDK_CLS_NAMES_FILE + ", ldr=" + ldr + ']'); + + processResource(jdkClsNames); + + checkHasClassName(GridDhtPartitionFullMap.class.getName(), ldr, CLS_NAMES_FILE); + checkHasClassName(GridDhtPartitionMap.class.getName(), ldr, CLS_NAMES_FILE); + checkHasClassName(HashMap.class.getName(), ldr, JDK_CLS_NAMES_FILE); } catch (IOException e) { throw new IllegalStateException("Failed to initialize marshaller context.", e); @@ -63,6 +83,18 @@ public abstract class MarshallerContextAdapter implements MarshallerContext { } /** + * @param clsName Class name. + * @param ldr Class loader used to get properties file. + * @param fileName File name. + */ + private void checkHasClassName(String clsName, ClassLoader ldr, String fileName) { + if (!map.containsKey(clsName.hashCode())) + throw new IgniteException("Failed to read class name from class names properties file. " + + "Make sure class names properties file packaged with ignite binaries is not corrupted " + + "[clsName=" + clsName + ", fileName=" + fileName + ", ldr=" + ldr + ']'); + } + + /** * @param url Resource URL. * @throws IOException In case of error. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index f964bec..4b0251d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -59,7 +59,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery( new ContinuousQueryListener(log, workDir), null, - true, + ctx.cache().marshallerCache().context().affinityNode(), true ); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java index ee32692..779b54d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java @@ -22,19 +22,17 @@ import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.typedef.internal.*; -import java.io.*; - /** * Custom event. */ public class DiscoveryCustomEvent extends DiscoveryEvent { /** */ private static final long serialVersionUID = 0L; - + /** * Built-in event type: custom event sent. * <br> - * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(Serializable)}. + * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)}. * <p> * * @see DiscoveryCustomEvent @@ -42,7 +40,7 @@ public class DiscoveryCustomEvent extends DiscoveryEvent { public static final int EVT_DISCOVERY_CUSTOM_EVT = 18; /** */ - private Serializable data; + private DiscoveryCustomMessage customMsg; /** Affinity topology version. */ private AffinityTopologyVersion affTopVer; @@ -57,15 +55,15 @@ public class DiscoveryCustomEvent extends DiscoveryEvent { /** * @return Data. */ - public Serializable data() { - return data; + public DiscoveryCustomMessage customMessage() { + return customMsg; } /** - * @param data New data. + * @param customMsg New customMessage. */ - public void data(Serializable data) { - this.data = data; + public void customMessage(DiscoveryCustomMessage customMsg) { + this.customMsg = customMsg; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java index 11af716..6a6f22a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java @@ -73,6 +73,7 @@ public class IgfsMarshaller { } /** + * Serializes the message and sends it into the given output stream. * @param msg Message. * @param hdr Message header. * @param out Output. @@ -119,6 +120,7 @@ public class IgfsMarshaller { IgfsPathControlRequest req = (IgfsPathControlRequest)msg; + U.writeString(out, req.userName()); writePath(out, req.path()); writePath(out, req.destinationPath()); out.writeBoolean(req.flag()); @@ -236,6 +238,7 @@ public class IgfsMarshaller { case OPEN_CREATE: { IgfsPathControlRequest req = new IgfsPathControlRequest(); + req.userName(U.readString(in)); req.path(readPath(in)); req.destinationPath(readPath(in)); req.flag(in.readBoolean()); @@ -298,8 +301,6 @@ public class IgfsMarshaller { } } - assert msg != null; - msg.command(cmd); return msg; @@ -341,34 +342,4 @@ public class IgfsMarshaller { return null; } - - /** - * Writes string to output. - * - * @param out Data output. - * @param str String. - * @throws IOException If write failed. - */ - private void writeString(DataOutput out, @Nullable String str) throws IOException { - out.writeBoolean(str != null); - - if (str != null) - out.writeUTF(str); - } - - /** - * Reads string from input. - * - * @param in Data input. - * @return Read string. - * @throws IOException If read failed. - */ - @Nullable private String readString(DataInput in) throws IOException { - boolean hasStr = in.readBoolean(); - - if (hasStr) - return in.readUTF(); - - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java index 7ed1619..2f6e6e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.igfs.common; import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -63,6 +64,9 @@ public class IgfsPathControlRequest extends IgfsMessage { /** Last modification time. */ private long modificationTime; + /** The user name this control request is made on behalf of. */ + private String userName; + /** * @param path Path. */ @@ -235,4 +239,22 @@ public class IgfsPathControlRequest extends IgfsMessage { @Override public String toString() { return S.toString(IgfsPathControlRequest.class, this, "cmd", command()); } + + /** + * Getter for the user name. + * @return user name. + */ + public final String userName() { + assert userName != null; + + return userName; + } + + /** + * Setter for the user name. + * @param userName the user name. + */ + public final void userName(String userName) { + this.userName = IgfsUtils.fixUserName(userName); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java index 77cd2a0..96639cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java @@ -48,7 +48,7 @@ public class InteropIgnition { * @return Ignite instance. */ public static synchronized InteropProcessor start(@Nullable String springCfgPath, @Nullable String gridName, - int factoryId, long envPtr) { + int factoryId, long envPtr) { IgniteConfiguration cfg = configuration(springCfgPath); if (gridName != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index c93c059..bea4256 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -23,7 +23,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -31,7 +31,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.spi.*; -import org.apache.ignite.spi.swapspace.*; + import org.jetbrains.annotations.*; import javax.cache.expiry.*; @@ -439,46 +439,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan return ctx.cache().cache(cacheName).containsKey(key); } - @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val, - @Nullable ClassLoader ldr) { - assert ctx.swap().enabled(); - - try { - ctx.swap().write(spaceName, key, val, ldr); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - @SuppressWarnings({"unchecked"}) - @Nullable @Override public <T> T readFromSwap(String spaceName, SwapKey key, - @Nullable ClassLoader ldr) { - try { - assert ctx.swap().enabled(); - - return ctx.swap().readValue(spaceName, key, ldr); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - @Override public int partition(String cacheName, Object key) { return ctx.cache().cache(cacheName).affinity().partition(key); } - @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) { - try { - assert ctx.swap().enabled(); - - ctx.swap().remove(spaceName, key, null, ldr); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - @Override public IgniteNodeValidationResult validateNode(ClusterNode node) { for (GridComponent comp : ctx) { IgniteNodeValidationResult err = comp.validateNode(node); @@ -508,26 +472,6 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan } } - @SuppressWarnings("unchecked") - @Nullable @Override public <V> V readValueFromOffheapAndSwap(@Nullable String spaceName, - Object key, @Nullable ClassLoader ldr) { - try { - IgniteInternalCache<Object, V> cache = ctx.cache().cache(spaceName); - - GridCacheContext cctx = cache.context(); - - if (cctx.isNear()) - cctx = cctx.near().dht().context(); - - GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true); - - return e != null ? CU.<V>value(e.value(), cctx, true) : null; - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - @Override public MessageFormatter messageFormatter() { return ctx.io().formatter(); } @@ -540,6 +484,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan return ctx.discovery().tryFailNode(nodeId); } + @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { + ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj)); + } + + @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj)); + } + /** * @param e Exception to handle. * @return GridSpiException Converted exception. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java index 2e80b6f..ce2a36c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java @@ -56,11 +56,10 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { private final GridMessageListener lsnr = new CheckpointRequestListener(); /** */ - private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap = new ConcurrentHashMap8<>(); + private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap; /** */ - private final Collection<IgniteUuid> closedSess = new GridBoundedConcurrentLinkedHashSet<>( - MAX_CLOSED_SESS, MAX_CLOSED_SESS, 0.75f, 256, PER_SEGMENT_Q); + private final Collection<IgniteUuid> closedSess; /** Grid marshaller. */ private final Marshaller marsh; @@ -72,6 +71,21 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { super(ctx, ctx.config().getCheckpointSpi()); marsh = ctx.config().getMarshaller(); + + if (enabled()) { + keyMap = new ConcurrentHashMap8<>(); + + closedSess = new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_SESS, + MAX_CLOSED_SESS, + 0.75f, + 256, + PER_SEGMENT_Q); + } + else { + keyMap = null; + + closedSess = null; + } } /** {@inheritDoc} */ @@ -112,7 +126,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @return Session IDs. */ public Collection<IgniteUuid> sessionIds() { - return new ArrayList<>(keyMap.keySet()); + return enabled() ? new ArrayList<>(keyMap.keySet()) : Collections.<IgniteUuid>emptyList(); } /** @@ -125,8 +139,17 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @return {@code true} if checkpoint has been actually saved, {@code false} otherwise. * @throws IgniteCheckedException Thrown in case of any errors. */ - public boolean storeCheckpoint(GridTaskSessionInternal ses, String key, Object state, ComputeTaskSessionScope scope, - long timeout, boolean override) throws IgniteCheckedException { + public boolean storeCheckpoint(GridTaskSessionInternal ses, + String key, + Object state, + ComputeTaskSessionScope scope, + long timeout, + boolean override) + throws IgniteCheckedException + { + if (!enabled()) + return false; + assert ses != null; assert key != null; @@ -239,6 +262,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @return Whether or not checkpoint was removed. */ public boolean removeCheckpoint(String key) { + if (!enabled()) + return false; + assert key != null; boolean rmv = false; @@ -256,6 +282,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @return Whether or not checkpoint was removed. */ public boolean removeCheckpoint(GridTaskSessionInternal ses, String key) { + if (!enabled()) + return false; + assert ses != null; assert key != null; @@ -283,6 +312,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @throws IgniteCheckedException Thrown in case of any errors. */ @Nullable public Serializable loadCheckpoint(GridTaskSessionInternal ses, String key) throws IgniteCheckedException { + if (!enabled()) + return null; + assert ses != null; assert key != null; @@ -309,6 +341,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @param cleanup Whether cleanup or not. */ public void onSessionEnd(GridTaskSessionInternal ses, boolean cleanup) { + if (!enabled()) + return; + closedSess.add(ses.getId()); // If on task node. @@ -358,7 +393,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { @Override public void printMemoryStats() { X.println(">>>"); X.println(">>> Checkpoint manager memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> keyMap: " + keyMap.size()); + X.println(">>> keyMap: " + (keyMap != null ? keyMap.size() : 0)); } /** @@ -407,6 +442,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { if (log.isDebugEnabled()) log.debug("Received checkpoint request: " + req); + if (!enabled()) + return; + IgniteUuid sesId = req.getSessionId(); if (closedSess.contains(sesId)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index c877d57..4382731 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1211,6 +1211,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) { if (p != null) { try { + if (p instanceof GridLifecycleAwareMessageFilter) + ((GridLifecycleAwareMessageFilter)p).initialize(ctx); + else + ctx.resource().injectGeneric(p); + addMessageListener(TOPIC_COMM_USER, new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p)); } @@ -1695,13 +1700,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa throws IgniteCheckedException { this.topic = topic; this.predLsnr = predLsnr; - - if (predLsnr != null) { - if (predLsnr instanceof GridLifecycleAwareMessageFilter) - ((GridLifecycleAwareMessageFilter)predLsnr).initialize(ctx); - else - ctx.resource().injectGeneric(predLsnr); - } } /** {@inheritDoc} */ @@ -1724,69 +1722,84 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } - Object msgBody = ioMsg.body(); - - assert msgBody != null || ioMsg.bodyBytes() != null; + busyLock.readLock(); try { - byte[] msgTopicBytes = ioMsg.topicBytes(); - - Object msgTopic = ioMsg.topic(); - - GridDeployment dep = ioMsg.deployment(); - - if (dep == null && ctx.config().isPeerClassLoadingEnabled() && - ioMsg.deploymentClassName() != null) { - dep = ctx.deploy().getGlobalDeployment( - ioMsg.deploymentMode(), - ioMsg.deploymentClassName(), - ioMsg.deploymentClassName(), - ioMsg.userVersion(), - nodeId, - ioMsg.classLoaderId(), - ioMsg.loaderParticipants(), - null); - - if (dep == null) - throw new IgniteDeploymentCheckedException( - "Failed to obtain deployment information for user message. " + - "If you are using custom message or topic class, try implementing " + - "GridPeerDeployAware interface. [msg=" + ioMsg + ']'); - - ioMsg.deployment(dep); // Cache deployment. + if (stopping) { + if (log.isDebugEnabled()) + log.debug("Received user message while stopping (will ignore) [nodeId=" + + nodeId + ", msg=" + msg + ']'); + + return; } - // Unmarshall message topic if needed. - if (msgTopic == null && msgTopicBytes != null) { - msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null); + Object msgBody = ioMsg.body(); - ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings. - } + assert msgBody != null || ioMsg.bodyBytes() != null; - if (!F.eq(topic, msgTopic)) - return; + try { + byte[] msgTopicBytes = ioMsg.topicBytes(); + + Object msgTopic = ioMsg.topic(); + + GridDeployment dep = ioMsg.deployment(); + + if (dep == null && ctx.config().isPeerClassLoadingEnabled() && + ioMsg.deploymentClassName() != null) { + dep = ctx.deploy().getGlobalDeployment( + ioMsg.deploymentMode(), + ioMsg.deploymentClassName(), + ioMsg.deploymentClassName(), + ioMsg.userVersion(), + nodeId, + ioMsg.classLoaderId(), + ioMsg.loaderParticipants(), + null); + + if (dep == null) + throw new IgniteDeploymentCheckedException( + "Failed to obtain deployment information for user message. " + + "If you are using custom message or topic class, try implementing " + + "GridPeerDeployAware interface. [msg=" + ioMsg + ']'); + + ioMsg.deployment(dep); // Cache deployment. + } - if (msgBody == null) { - msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null); + // Unmarshall message topic if needed. + if (msgTopic == null && msgTopicBytes != null) { + msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null); - ioMsg.body(msgBody); // Save body to avoid future unmarshallings. - } + ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings. + } - // Resource injection. - if (dep != null) - ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" + - msg + ']', e); - } + if (!F.eq(topic, msgTopic)) + return; + + if (msgBody == null) { + msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null); - if (msgBody != null) { - if (predLsnr != null) { - if (!predLsnr.apply(nodeId, msgBody)) - removeMessageListener(TOPIC_COMM_USER, this); + ioMsg.body(msgBody); // Save body to avoid future unmarshallings. + } + + // Resource injection. + if (dep != null) + ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" + + msg + ']', e); + } + + if (msgBody != null) { + if (predLsnr != null) { + if (!predLsnr.apply(nodeId, msgBody)) + removeMessageListener(TOPIC_COMM_USER, this); + } } } + finally { + busyLock.readUnlock(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java new file mode 100644 index 0000000..2005d4e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.cluster.*; + +/** + * Listener interface. + */ +public interface CustomEventListener<T extends DiscoveryCustomMessage> { + /** + * @param snd Sender. + * @param msg Message. + */ + public void onCustomEvent(ClusterNode snd, T msg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java new file mode 100644 index 0000000..23f8bda --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.spi.discovery.*; +import org.jetbrains.annotations.*; + +/** + * + */ +class CustomMessageWrapper implements DiscoverySpiCustomMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final DiscoveryCustomMessage delegate; + + /** + * @param delegate Delegate. + */ + CustomMessageWrapper(DiscoveryCustomMessage delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + DiscoveryCustomMessage res = delegate.ackMessage(); + + return res == null ? null : new CustomMessageWrapper(res); + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return delegate.isMutable(); + } + + /** + * @return Delegate. + */ + public DiscoveryCustomMessage delegate() { + return delegate; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return delegate.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java new file mode 100644 index 0000000..401486d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * + */ +public interface DiscoveryCustomMessage extends Serializable { + /** + * @return Unique custom message ID. + */ + public IgniteUuid id(); + + /** + * Whether or not minor version of topology should be increased on message receive. + * + * @return {@code true} if minor topology version should be increased. + * @see AffinityTopologyVersion#minorTopVer + */ + public boolean incrementMinorTopologyVersion(); + + /** + * Called when custom message has been handled by all nodes. + * + * @return Ack message or {@code null} if ack is not required. + */ + @Nullable public DiscoveryCustomMessage ackMessage(); + + /** + * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. + */ + public boolean isMutable(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 0950774..71fbc61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.security.*; +import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; @@ -165,10 +166,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final GridLocalMetrics metrics = createMetrics(); /** Metrics update worker. */ - private final MetricsUpdater metricsUpdater = new MetricsUpdater(); + private GridTimeoutProcessor.CancelableTask metricsUpdateTask; /** Custom event listener. */ - private GridPlainInClosure<Serializable> customEvtLsnr; + private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs = + new ConcurrentHashMap8<>(); /** Map of dynamic cache filters. */ private Map<String, CachePredicate> registeredCaches = new HashMap<>(); @@ -176,6 +178,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + /** Received custom messages history. */ + private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>(); + /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { super(ctx, ctx.config().getDiscoverySpi()); @@ -214,6 +219,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheName Cache name. * @param filter Cache filter. + * @param nearEnabled Near enabled flag. * @param loc {@code True} if cache is local. */ public void setCacheFilter( @@ -240,12 +246,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheName Cache name. * @param clientNodeId Near node ID. + * @param nearEnabled Near enabled flag. */ public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - if (predicate != null) - predicate.addClientNode(clientNodeId, nearEnabled); + if (pred != null) + pred.addClientNode(clientNodeId, nearEnabled); } /** @@ -279,17 +286,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - /** - * @param evtType Event type. - * @return Next affinity topology version. - */ - private AffinityTopologyVersion nextTopologyVersion(int evtType, long topVer) { - if (evtType == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) - minorTopVer++; - else if (evtType != EVT_NODE_METRICS_UPDATED) - minorTopVer = 0; - - return new AffinityTopologyVersion(topVer, minorTopVer); + /** {@inheritDoc} */ + @Override protected void onKernalStart0() throws IgniteCheckedException { + if (Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode()) + ctx.performance().add("Enable client mode for TcpDiscoverySpi " + + "(set TcpDiscoverySpi.forceServerMode to false)"); } /** {@inheritDoc} */ @@ -328,7 +329,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { checkSegmentOnStart(); } - new IgniteThread(metricsUpdater).start(); + metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ, METRICS_UPDATE_FREQ); spi.setMetricsProvider(createMetricsProvider()); @@ -356,14 +357,41 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ClusterNode node, Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> snapshots, - @Nullable Serializable data + @Nullable DiscoverySpiCustomMessage spiCustomMsg ) { + DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null + : ((CustomMessageWrapper)spiCustomMsg).delegate(); + + if (skipMessage(type, customMsg)) + return; + final ClusterNode locNode = localNode(); if (snapshots != null) topHist = snapshots; - AffinityTopologyVersion nextTopVer = nextTopologyVersion(type, topVer); + boolean verChanged; + + if (type == EVT_NODE_METRICS_UPDATED) + verChanged = false; + else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + assert customMsg != null; + + if (customMsg.incrementMinorTopologyVersion()) { + minorTopVer++; + + verChanged = true; + } + else + verChanged = false; + } + else { + minorTopVer = 0; + + verChanged = true; + } + + AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) { for (DiscoCache c : discoCacheHist.values()) @@ -373,19 +401,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { - try { - if (customEvtLsnr != null) - customEvtLsnr.apply(data); - } - catch (Exception e) { - U.error(log, "Failed to notify direct custom event listener: " + data, e); + for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) { + List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls); + + if (list != null) { + for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) { + try { + lsnr.onCustomEvent(node, customMsg); + } + catch (Exception e) { + U.error(log, "Failed to notify direct custom event listener: " + customMsg, e); + } + } + } } } // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. - if (type != EVT_NODE_METRICS_UPDATED) { + if (verChanged) { DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id()))); discoCacheHist.put(nextTopVer, cache); @@ -417,7 +452,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return; } - discoWrk.addEvent(type, nextTopVer, node, topSnapshot, data); + discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); } }); @@ -486,10 +521,43 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @param customEvtLsnr Custom event listener. + * @param type Message type. + * @param customMsg Custom message. + * @return {@code True} if should not process message. */ - public void setCustomEventListener(GridPlainInClosure<Serializable> customEvtLsnr) { - this.customEvtLsnr = customEvtLsnr; + private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) { + if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + assert customMsg != null && customMsg.id() != null : customMsg; + + if (rcvdCustomMsgs.contains(customMsg.id())) { + if (log.isDebugEnabled()) + log.debug("Received duplicated custom message, will ignore [msg=" + customMsg + "]"); + + return true; + } + + rcvdCustomMsgs.addLast(customMsg.id()); + + while (rcvdCustomMsgs.size() > DISCOVERY_HISTORY_SIZE) + rcvdCustomMsgs.pollFirst(); + } + + return false; + } + + /** + * @param msgCls Message class. + * @param lsnr Custom event listener. + */ + public <T extends DiscoveryCustomMessage> void setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) { + List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(msgCls); + + if (list == null) { + list = F.addIfAbsent(customEvtLsnrs, msgCls, + new CopyOnWriteArrayList<CustomEventListener<DiscoveryCustomMessage>>()); + } + + list.add((CustomEventListener<DiscoveryCustomMessage>)lsnr); } /** @@ -660,7 +728,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Map<Integer, CacheMetrics> metrics = null; for (GridCacheAdapter<?, ?> cache : caches) { - if (cache.context().started() && cache.configuration().isStatisticsEnabled()) { + if (cache.configuration().isStatisticsEnabled() && + cache.context().started() && + cache.context().affinity().affinityTopologyVersion().topologyVersion() > 0) { if (metrics == null) metrics = U.newHashMap(caches.size()); @@ -952,11 +1022,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { getSpi().setListener(null); // Stop discovery worker and metrics updater. + U.closeQuiet(metricsUpdateTask); + U.cancel(discoWrk); - U.cancel(metricsUpdater); U.join(discoWrk, log); - U.join(metricsUpdater, log); // Stop SPI itself. stopSpi(); @@ -1218,13 +1288,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets alive remote nodes with at least one cache configured. + * Gets alive remote server nodes with at least one cache configured. * * @param topVer Topology version (maximum allowed node order). * @return Collection of alive cache nodes. */ - public Collection<ClusterNode> aliveRemoteNodesWithCaches(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).aliveRemoteNodesWithCaches(topVer.topologyVersion()); + public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) { + return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion()); + } + + /** + * Gets alive server nodes with at least one cache configured. + * + * @param topVer Topology version (maximum allowed node order). + * @return Collection of alive cache nodes. + */ + public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) { + return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion()); } /** @@ -1256,9 +1336,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node is a cache data node. */ public boolean cacheAffinityNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.dataNode(node); + return pred != null && pred.dataNode(node); } /** @@ -1267,9 +1347,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node has near cache enabled. */ public boolean cacheNearNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.nearNode(node); + return pred != null && pred.nearNode(node); } /** @@ -1278,9 +1358,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node has client cache (without near cache). */ public boolean cacheClientNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.clientNode(node); + return pred != null && pred.clientNode(node); } /** @@ -1289,9 +1369,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return If cache with the given name is accessible on the given node. */ public boolean cacheNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.cacheNode(node); + return pred != null && pred.cacheNode(node); } /** @@ -1384,10 +1464,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @param evt Event. + * @param msg Custom message. */ - public void sendCustomEvent(Serializable evt) { - getSpi().sendCustomEvent(evt); + public void sendCustomEvent(DiscoveryCustomMessage msg) { + getSpi().sendCustomEvent(new CustomMessageWrapper(msg)); } /** @@ -1542,8 +1622,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { /** Event queue. */ - private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable>> evts = - new LinkedBlockingQueue<>(); + private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, + DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); /** Node segmented event fired flag. */ private boolean nodeSegFired; @@ -1609,9 +1689,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { AffinityTopologyVersion topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, - @Nullable Serializable data + @Nullable DiscoveryCustomMessage data ) { - assert node != null; + assert node != null : data; evts.add(F.t(type, topVer, node, topSnapshot, data)); } @@ -1650,7 +1730,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @throws InterruptedException If interrupted. */ @SuppressWarnings("DuplicateCondition") private void body0() throws InterruptedException { - GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable> evt = evts.take(); + GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, + DiscoveryCustomMessage> evt = evts.take(); int type = evt.get1(); @@ -1768,7 +1849,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { customEvt.type(type); customEvt.topologySnapshot(topVer.topologyVersion(), null); customEvt.affinityTopologyVersion(topVer); - customEvt.data(evt.get5()); + customEvt.customMessage(evt.get5()); ctx.event().record(customEvt); } @@ -1833,28 +1914,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * */ - private class MetricsUpdater extends GridWorker { + private class MetricsUpdater implements Runnable { /** */ private long prevGcTime = -1; /** */ private long prevCpuTime = -1; - /** - * - */ - private MetricsUpdater() { - super(ctx.gridName(), "metrics-updater", GridDiscoveryManager.this.log); - } - /** {@inheritDoc} */ - @Override protected void body() throws IgniteInterruptedCheckedException { - while (!isCancelled()) { - U.sleep(METRICS_UPDATE_FREQ); - - gcCpuLoad = getGcCpuLoad(); - cpuLoad = getCpuLoad(); - } + @Override public void run() { + gcCpuLoad = getGcCpuLoad(); + cpuLoad = getCpuLoad(); } /** @@ -2065,9 +2135,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final Collection<ClusterNode> aliveNodesWithCaches; /** - * Cached alive remote nodes with caches. + * Cached alive server remote nodes with caches. + */ + private final Collection<ClusterNode> aliveSrvNodesWithCaches; + + /** + * Cached alive remote server nodes with caches. */ - private final Collection<ClusterNode> aliveRmtNodesWithCaches; + private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches; /** * @param loc Local node. @@ -2088,21 +2163,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { all.addAll(rmtNodes); + Collections.sort(all, GridNodeOrderComparator.INSTANCE); + allNodes = Collections.unmodifiableList(all); - Map<String, Collection<ClusterNode>> cacheMap = - new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> rmtCacheMap = - new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> dhtNodesMap = - new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> dhtNodesMap =new HashMap<>(allNodes.size(), 1.0f); Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size()); Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size()); aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveNodesWithCaches = new ConcurrentSkipListSet<>(); - aliveRmtNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>(); nodesByVer = new TreeMap<>(); long maxOrder0 = 0; @@ -2154,8 +2229,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (alive(node.id())) { aliveNodesWithCaches.add(node); - if (!loc.id().equals(node.id())) - aliveRmtNodesWithCaches.add(node); + if (!CU.clientNode(node)) { + aliveSrvNodesWithCaches.add(node); + + if (!loc.id().equals(node.id())) + aliveRmtSrvNodesWithCaches.add(node); + } } } @@ -2240,13 +2319,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @return All nodes with at least one cache configured. - */ - Collection<ClusterNode> allNodesWithCaches() { - return allNodesWithCaches; - } - - /** * Gets collection of nodes which have version equal or greater than {@code ver}. * * @param ver Version to check. @@ -2345,13 +2417,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets all alive remote nodes with at least one cache configured. + * Gets all alive remote server nodes with at least one cache configured. * * @param topVer Topology version. * @return Collection of nodes. */ - Collection<ClusterNode> aliveRemoteNodesWithCaches(final long topVer) { - return filter(topVer, aliveRmtNodesWithCaches); + Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) { + return filter(topVer, aliveRmtSrvNodesWithCaches); + } + + /** + * Gets all alive server nodes with at least one cache configured. + * + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) { + return filter(topVer, aliveSrvNodesWithCaches); } /** @@ -2388,7 +2470,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { filterNodeMap(aliveRmtCacheNodes, leftNode); aliveNodesWithCaches.remove(leftNode); - aliveRmtNodesWithCaches.remove(leftNode); + aliveSrvNodesWithCaches.remove(leftNode); + aliveRmtSrvNodesWithCaches.remove(leftNode); } /** @@ -2480,11 +2563,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private boolean loc; /** Collection of client near nodes. */ - private Map<UUID, Boolean> clientNodes; + private ConcurrentHashMap<UUID, Boolean> clientNodes; /** * @param cacheFilter Cache filter. * @param nearEnabled Near enabled flag. + * @param loc {@code True} if cache is local. */ private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, boolean loc) { assert cacheFilter != null; @@ -2498,9 +2582,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @param nodeId Near node ID to add. + * @param nearEnabled Near enabled flag. */ public void addClientNode(UUID nodeId, boolean nearEnabled) { - clientNodes.put(nodeId, nearEnabled); + clientNodes.putIfAbsent(nodeId, nearEnabled); } /** @@ -2515,7 +2600,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if this node is a data node for given cache. */ public boolean dataNode(ClusterNode node) { - return !node.isDaemon() && cacheFilter.apply(node); + return !node.isDaemon() && CU.affinityNode(node, cacheFilter); } /** @@ -2523,8 +2608,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if cache is accessible on the given node. */ public boolean cacheNode(ClusterNode node) { - return !node.isClient() && !node.isDaemon() && - (cacheFilter.apply(node) || clientNodes.containsKey(node.id())); + return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id())); } /** @@ -2535,8 +2619,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (node.isDaemon()) return false; - if (nearEnabled && cacheFilter.apply(node)) - return true; + if (CU.affinityNode(node, cacheFilter)) + return nearEnabled; Boolean near = clientNodes.get(node.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java index 9a81cd1..f1561bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; @@ -46,9 +45,6 @@ public class GridIndexingManager extends GridManagerAdapter<IndexingSpi> { * @throws IgniteCheckedException Thrown in case of any errors. */ @Override public void start() throws IgniteCheckedException { - if (!enabled()) - U.warn(log, "Indexing is disabled (to enable please configure GridIndexingSpi)."); - startSpi(); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index e9df8b8..5373e46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -68,6 +68,18 @@ class GridAffinityAssignment implements Serializable { } /** + * @param topVer Topology version. + * @param aff Assignment to copy from. + */ + GridAffinityAssignment(AffinityTopologyVersion topVer, GridAffinityAssignment aff) { + this.topVer = topVer; + + assignment = aff.assignment; + primary = aff.primary; + backup = aff.backup; + } + + /** * @return Affinity assignment. */ public List<List<ClusterNode>> assignment() {