# ignite-63
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0204d759 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0204d759 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0204d759 Branch: refs/heads/ignite-63 Commit: 0204d759f0b049fe8878acb6b4f125183fe55906 Parents: 172bc7a Author: sboikov <sboi...@gridgain.com> Authored: Fri Jan 23 11:54:54 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jan 23 11:54:54 2015 +0300 ---------------------------------------------------------------------- .../impl/GridRouterCommandLineStartup.java | 2 +- .../org/apache/ignite/internal/GridGainEx.java | 2 +- .../ignite/internal/GridKernalContext.java | 2 +- .../ignite/internal/GridKernalContextImpl.java | 10 +- .../ignite/internal/IgniteComponentType.java | 6 +- .../internal/processors/fs/GridGgfsHelper.java | 49 -- .../processors/fs/GridGgfsHelperImpl.java | 54 --- .../processors/fs/GridGgfsProcessor.java | 463 ------------------- .../processors/fs/GridNoopGgfsHelper.java | 41 -- .../internal/processors/fs/IgniteFsHelper.java | 49 ++ .../processors/fs/IgniteFsHelperImpl.java | 54 +++ .../processors/fs/IgniteFsNoopHelper.java | 41 ++ .../processors/fs/IgniteFsProcessor.java | 463 +++++++++++++++++++ .../processors/spring/GridSpringProcessor.java | 73 --- .../spring/IgniteSpringProcessor.java | 73 +++ .../fs/GridGgfsProcessorSelfTest.java | 2 +- .../fs/GridGgfsProcessorValidationSelfTest.java | 6 +- .../spring/GridSpringProcessorImpl.java | 268 ----------- .../spring/IgniteSpringProcessorImpl.java | 268 +++++++++++ .../scala/org/apache/ignite/visor/visor.scala | 4 +- 20 files changed, 965 insertions(+), 965 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterCommandLineStartup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterCommandLineStartup.java b/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterCommandLineStartup.java index 4f591a4..691e35a 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterCommandLineStartup.java +++ b/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterCommandLineStartup.java @@ -110,7 +110,7 @@ public class GridRouterCommandLineStartup { " " ); - GridSpringProcessor spring = SPRING.create(false); + IgniteSpringProcessor spring = SPRING.create(false); if (args.length < 1) { X.error("Missing XML configuration path."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java b/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java index 3d134b1..ae541fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java @@ -549,7 +549,7 @@ public class GridGainEx { */ public static IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> loadConfigurations( URL springCfgUrl) throws IgniteCheckedException { - GridSpringProcessor spring = SPRING.create(false); + IgniteSpringProcessor spring = SPRING.create(false); return spring.loadConfigurations(springCfgUrl); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 7feaa53..282ace5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -292,7 +292,7 @@ public interface GridKernalContext extends Iterable<GridComponent> { * * @return GGFS utils processor. */ - public GridGgfsHelper ggfsHelper(); + public IgniteFsHelper ggfsHelper(); /** * Gets stream processor. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 6cedcc9..e9e4599 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -219,7 +219,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude - private GridGgfsHelper ggfsHelper; + private IgniteFsHelper ggfsHelper; /** */ @GridToStringInclude @@ -259,7 +259,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude - private GridSpringProcessor spring; + private IgniteSpringProcessor spring; /** */ @GridToStringExclude @@ -462,8 +462,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable public void addHelper(Object helper) { assert helper != null; - if (helper instanceof GridGgfsHelper) - ggfsHelper = (GridGgfsHelper)helper; + if (helper instanceof IgniteFsHelper) + ggfsHelper = (IgniteFsHelper)helper; else assert false : "Unknown helper class: " + helper.getClass(); } @@ -672,7 +672,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public GridGgfsHelper ggfsHelper() { + @Override public IgniteFsHelper ggfsHelper() { return ggfsHelper; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index 54fb860..1e1fcc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -42,15 +42,15 @@ public enum IgniteComponentType { /** GGFS helper component. */ GGFS_HELPER( - "org.gridgain.grid.kernal.processors.ggfs.GridNoopGgfsHelper", - "org.gridgain.grid.kernal.processors.ggfs.GridGgfsHelperImpl", + "org.apache.ignite.internal.processors.fs.IgniteFsNoopHelper", + "org.apache.ignite.internal.processors.fs.IgniteFsHelperImpl", "ignite-hadoop" ), /** Spring XML parsing. */ SPRING( null, - "org.apache.ignite.internal.processors.spring.GridSpringProcessorImpl", + "org.apache.ignite.internal.processors.spring.IgniteSpringProcessorImpl", "ignite-spring" ), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java deleted file mode 100644 index c11e2b4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; - -/** - * GGFS utility processor adapter. - */ -public interface GridGgfsHelper { - /** - * Pre-process cache configuration. - * - * @param cfg Cache configuration. - */ - public abstract void preProcessCacheConfiguration(CacheConfiguration cfg); - - /** - * Validate cache configuration for GGFS. - * - * @param cfg Cache configuration. - * @throws IgniteCheckedException If validation failed. - */ - public abstract void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException; - - /** - * Check whether object is of type {@code GridGgfsBlockKey} - * - * @param key Key. - * @return {@code True} if GGFS block key. - */ - public abstract boolean isGgfsBlockKey(Object key); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java deleted file mode 100644 index 84b1eaf..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.cache.eviction.ggfs.*; - -/** - * GGFS utils processor. - */ -public class GridGgfsHelperImpl implements GridGgfsHelper { - /** {@inheritDoc} */ - @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) { - GridCacheEvictionPolicy evictPlc = cfg.getEvictionPolicy(); - - if (evictPlc instanceof GridCacheGgfsPerBlockLruEvictionPolicy && cfg.getEvictionFilter() == null) - cfg.setEvictionFilter(new GridCacheGgfsEvictionFilter()); - } - - /** {@inheritDoc} */ - @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException { - GridCacheEvictionPolicy evictPlc = cfg.getEvictionPolicy(); - - if (evictPlc != null && evictPlc instanceof GridCacheGgfsPerBlockLruEvictionPolicy) { - GridCacheEvictionFilter evictFilter = cfg.getEvictionFilter(); - - if (evictFilter != null && !(evictFilter instanceof GridCacheGgfsEvictionFilter)) - throw new IgniteCheckedException("Eviction filter cannot be set explicitly when using " + - "GridCacheGgfsPerBlockLruEvictionPolicy:" + cfg.getName()); - } - } - - /** {@inheritDoc} */ - @Override public boolean isGgfsBlockKey(Object key) { - return key instanceof GridGgfsBlockKey; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessor.java deleted file mode 100644 index b4fc340..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessor.java +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.fs.mapreduce.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.processors.license.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.IgniteSystemProperties.*; -import static org.apache.ignite.cache.GridCacheMemoryMode.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.fs.IgniteFsMode.*; -import static org.apache.ignite.internal.GridNodeAttributes.*; -import static org.apache.ignite.internal.processors.license.GridLicenseSubsystem.*; - -/** - * Fully operational Ignite file system processor. - */ -public class GridGgfsProcessor extends IgniteFsProcessorAdapter { - /** Null GGFS name. */ - private static final String NULL_NAME = UUID.randomUUID().toString(); - - /** Converts context to GGFS. */ - private static final IgniteClosure<GridGgfsContext,IgniteFs> CTX_TO_GGFS = new C1<GridGgfsContext, IgniteFs>() { - @Override public IgniteFs apply(GridGgfsContext ggfsCtx) { - return ggfsCtx.ggfs(); - } - }; - - /** */ - private final ConcurrentMap<String, GridGgfsContext> ggfsCache = - new ConcurrentHashMap8<>(); - - /** - * @param ctx Kernal context. - */ - public GridGgfsProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - IgniteFsConfiguration[] cfgs = ctx.config().getGgfsConfiguration(); - - assert cfgs != null && cfgs.length > 0; - - // Register GGFS messages. - GridTcpCommunicationMessageFactory.registerCommon(new GridTcpCommunicationMessageProducer() { - @Override - public GridTcpCommunicationMessageAdapter create(byte type) { - switch (type) { - case 65: - return new GridGgfsAckMessage(); - - case 66: - return new GridGgfsBlockKey(); - - case 67: - return new GridGgfsBlocksMessage(); - - case 68: - return new GridGgfsDeleteMessage(); - - case 69: - return new GridGgfsFileAffinityRange(); - - case 70: - return new GridGgfsFragmentizerRequest(); - - case 71: - return new GridGgfsFragmentizerResponse(); - - case 72: - return new GridGgfsSyncMessage(); - - default: - assert false : "Invalid GGFS message type."; - - return null; - } - } - }, 65, 66, 67, 68, 69,70, 71, 72); - - // Register HDFS edition usage with license manager. - GridLicenseUseRegistry.onUsage(HADOOP, getClass()); - - validateLocalGgfsConfigurations(cfgs); - - // Start GGFS instances. - for (IgniteFsConfiguration cfg : cfgs) { - GridGgfsContext ggfsCtx = new GridGgfsContext( - ctx, - new IgniteFsConfiguration(cfg), - new GridGgfsMetaManager(), - new GridGgfsDataManager(), - new GridGgfsServerManager(), - new GridGgfsFragmentizerManager()); - - // Start managers first. - for (GridGgfsManager mgr : ggfsCtx.managers()) - mgr.start(ggfsCtx); - - ggfsCache.put(maskName(cfg.getName()), ggfsCtx); - } - - if (log.isDebugEnabled()) - log.debug("GGFS processor started."); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - if (!getBoolean(GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) - checkGgfsOnRemoteNode(n); - } - - for (GridGgfsContext ggfsCtx : ggfsCache.values()) - for (GridGgfsManager mgr : ggfsCtx.managers()) - mgr.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - // Stop GGFS instances. - for (GridGgfsContext ggfsCtx : ggfsCache.values()) { - if (log.isDebugEnabled()) - log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); - - List<GridGgfsManager> mgrs = ggfsCtx.managers(); - - for (ListIterator<GridGgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { - GridGgfsManager mgr = it.previous(); - - mgr.stop(cancel); - } - - ggfsCtx.ggfs().stop(); - } - - ggfsCache.clear(); - - if (log.isDebugEnabled()) - log.debug("GGFS processor stopped."); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - for (GridGgfsContext ggfsCtx : ggfsCache.values()) { - if (log.isDebugEnabled()) - log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); - - List<GridGgfsManager> mgrs = ggfsCtx.managers(); - - for (ListIterator<GridGgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { - GridGgfsManager mgr = it.previous(); - - mgr.onKernalStop(cancel); - } - } - - if (log.isDebugEnabled()) - log.debug("Finished executing GGFS processor onKernalStop() callback."); - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> ggfsCacheSize: " + ggfsCache.size()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public Collection<IgniteFs> ggfss() { - return F.viewReadOnly(ggfsCache.values(), CTX_TO_GGFS); - } - - /** {@inheritDoc} */ - @Override @Nullable public IgniteFs ggfs(@Nullable String name) { - GridGgfsContext ggfsCtx = ggfsCache.get(maskName(name)); - - return ggfsCtx == null ? null : ggfsCtx.ggfs(); - } - - /** {@inheritDoc} */ - @Override @Nullable public Collection<GridIpcServerEndpoint> endpoints(@Nullable String name) { - GridGgfsContext ggfsCtx = ggfsCache.get(maskName(name)); - - return ggfsCtx == null ? Collections.<GridIpcServerEndpoint>emptyList() : ggfsCtx.server().endpoints(); - } - - /** {@inheritDoc} */ - @Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, - long start, long length, IgniteFsRecordResolver recRslv) { - return new GridGgfsJobImpl(job, ggfsName, path, start, length, recRslv); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException { - super.addAttributes(attrs); - - IgniteConfiguration gridCfg = ctx.config(); - - // Node doesn't have GGFS if it: - // is daemon; - // doesn't have configured GGFS; - // doesn't have configured caches. - if (gridCfg.isDaemon() || F.isEmpty(gridCfg.getGgfsConfiguration()) || - F.isEmpty(gridCfg.getCacheConfiguration())) - return; - - final Map<String, CacheConfiguration> cacheCfgs = new HashMap<>(); - - F.forEach(gridCfg.getCacheConfiguration(), new CI1<CacheConfiguration>() { - @Override public void apply(CacheConfiguration c) { - cacheCfgs.put(c.getName(), c); - } - }); - - Collection<GridGgfsAttributes> attrVals = new ArrayList<>(); - - assert gridCfg.getGgfsConfiguration() != null; - - for (IgniteFsConfiguration ggfsCfg : gridCfg.getGgfsConfiguration()) { - CacheConfiguration cacheCfg = cacheCfgs.get(ggfsCfg.getDataCacheName()); - - if (cacheCfg == null) - continue; // No cache for the given GGFS configuration. - - GridCacheAffinityKeyMapper affMapper = cacheCfg.getAffinityMapper(); - - if (!(affMapper instanceof IgniteFsGroupDataBlocksKeyMapper)) - // Do not create GGFS attributes for such a node nor throw error about invalid configuration. - // Configuration will be validated later, while starting GridGgfsProcessor. - continue; - - attrVals.add(new GridGgfsAttributes( - ggfsCfg.getName(), - ggfsCfg.getBlockSize(), - ((IgniteFsGroupDataBlocksKeyMapper)affMapper).groupSize(), - ggfsCfg.getMetaCacheName(), - ggfsCfg.getDataCacheName(), - ggfsCfg.getDefaultMode(), - ggfsCfg.getPathModes(), - ggfsCfg.isFragmentizerEnabled())); - } - - attrs.put(ATTR_GGFS, attrVals.toArray(new GridGgfsAttributes[attrVals.size()])); - } - - /** - * @param name Cache name. - * @return Masked name accounting for {@code nulls}. - */ - private String maskName(@Nullable String name) { - return name == null ? NULL_NAME : name; - } - - /** - * Validates local GGFS configurations. Compares attributes only for GGFSes with same name. - * @param cfgs GGFS configurations - * @throws IgniteCheckedException If any of GGFS configurations is invalid. - */ - private void validateLocalGgfsConfigurations(IgniteFsConfiguration[] cfgs) throws IgniteCheckedException { - Collection<String> cfgNames = new HashSet<>(); - - for (IgniteFsConfiguration cfg : cfgs) { - String name = cfg.getName(); - - if (cfgNames.contains(name)) - throw new IgniteCheckedException("Duplicate GGFS name found (check configuration and " + - "assign unique name to each): " + name); - - GridCacheAdapter<Object, Object> dataCache = ctx.cache().internalCache(cfg.getDataCacheName()); - - if (dataCache == null) - throw new IgniteCheckedException("Data cache is not configured locally for GGFS: " + cfg); - - if (dataCache.configuration().isQueryIndexEnabled()) - throw new IgniteCheckedException("GGFS data cache cannot start with enabled query indexing."); - - GridCache<Object, Object> metaCache = ctx.cache().cache(cfg.getMetaCacheName()); - - if (metaCache == null) - throw new IgniteCheckedException("Metadata cache is not configured locally for GGFS: " + cfg); - - if (metaCache.configuration().isQueryIndexEnabled()) - throw new IgniteCheckedException("GGFS metadata cache cannot start with enabled query indexing."); - - if (F.eq(cfg.getDataCacheName(), cfg.getMetaCacheName())) - throw new IgniteCheckedException("Cannot use same cache as both data and meta cache: " + cfg.getName()); - - if (!(dataCache.configuration().getAffinityMapper() instanceof IgniteFsGroupDataBlocksKeyMapper)) - throw new IgniteCheckedException("Invalid GGFS data cache configuration (key affinity mapper class should be " + - IgniteFsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg); - - long maxSpaceSize = cfg.getMaxSpaceSize(); - - if (maxSpaceSize > 0) { - // Max space validation. - long maxHeapSize = Runtime.getRuntime().maxMemory(); - long offHeapSize = dataCache.configuration().getOffHeapMaxMemory(); - - if (offHeapSize < 0 && maxSpaceSize > maxHeapSize) - // Offheap is disabled. - throw new IgniteCheckedException("Maximum GGFS space size cannot be greater that size of available heap " + - "memory [maxHeapSize=" + maxHeapSize + ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); - else if (offHeapSize > 0 && maxSpaceSize > maxHeapSize + offHeapSize) - // Offheap is enabled, but limited. - throw new IgniteCheckedException("Maximum GGFS space size cannot be greater than size of available heap " + - "memory and offheap storage [maxHeapSize=" + maxHeapSize + ", offHeapSize=" + offHeapSize + - ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); - } - - if (dataCache.configuration().getCacheMode() == PARTITIONED) { - int backups = dataCache.configuration().getBackups(); - - if (backups != 0) - throw new IgniteCheckedException("GGFS data cache cannot be used with backups (set backup count " + - "to 0 and restart the grid): " + cfg.getDataCacheName()); - } - - if (cfg.getMaxSpaceSize() == 0 && dataCache.configuration().getMemoryMode() == OFFHEAP_VALUES) - U.warn(log, "GGFS max space size is not specified but data cache values are stored off-heap (max " + - "space will be limited to 80% of max JVM heap size): " + cfg.getName()); - - boolean secondary = cfg.getDefaultMode() == PROXY; - - if (cfg.getPathModes() != null) { - for (Map.Entry<String, IgniteFsMode> mode : cfg.getPathModes().entrySet()) { - if (mode.getValue() == PROXY) - secondary = true; - } - } - - if (secondary) { - // When working in any mode except of primary, secondary FS config must be provided. - assertParameter(cfg.getSecondaryFileSystem() != null, - "secondaryFileSystem cannot be null when mode is SECONDARY"); - } - - cfgNames.add(name); - } - } - - /** - * Check GGFS config on remote node. - * - * @param rmtNode Remote node. - * @throws IgniteCheckedException If check failed. - */ - private void checkGgfsOnRemoteNode(ClusterNode rmtNode) throws IgniteCheckedException { - GridGgfsAttributes[] locAttrs = ctx.discovery().localNode().attribute(GridNodeAttributes.ATTR_GGFS); - GridGgfsAttributes[] rmtAttrs = rmtNode.attribute(GridNodeAttributes.ATTR_GGFS); - - if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs)) - return; - - assert rmtAttrs != null && locAttrs != null; - - for (GridGgfsAttributes rmtAttr : rmtAttrs) - for (GridGgfsAttributes locAttr : locAttrs) { - // Checking the use of different caches on the different GGFSes. - if (!F.eq(rmtAttr.ggfsName(), locAttr.ggfsName())) { - if (F.eq(rmtAttr.metaCacheName(), locAttr.metaCacheName())) - throw new IgniteCheckedException("Meta cache names should be different for different GGFS instances " + - "configuration (fix configuration or set " + - "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property) [metaCacheName=" + rmtAttr.metaCacheName() + - ", locNodeId=" + ctx.localNodeId() + - ", rmtNodeId=" + rmtNode.id() + - ", locGgfsName=" + locAttr.ggfsName() + - ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); - - if (F.eq(rmtAttr.dataCacheName(), locAttr.dataCacheName())) - throw new IgniteCheckedException("Data cache names should be different for different GGFS instances " + - "configuration (fix configuration or set " + - "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property)[dataCacheName=" + rmtAttr.dataCacheName() + - ", locNodeId=" + ctx.localNodeId() + - ", rmtNodeId=" + rmtNode.id() + - ", locGgfsName=" + locAttr.ggfsName() + - ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); - - continue; - } - - // Compare other attributes only for GGFSes with same name. - checkSame("Data block size", "BlockSize", rmtNode.id(), rmtAttr.blockSize(), - locAttr.blockSize(), rmtAttr.ggfsName()); - - checkSame("Affinity mapper group size", "GrpSize", rmtNode.id(), rmtAttr.groupSize(), - locAttr.groupSize(), rmtAttr.ggfsName()); - - checkSame("Meta cache name", "MetaCacheName", rmtNode.id(), rmtAttr.metaCacheName(), - locAttr.metaCacheName(), rmtAttr.ggfsName()); - - checkSame("Data cache name", "DataCacheName", rmtNode.id(), rmtAttr.dataCacheName(), - locAttr.dataCacheName(), rmtAttr.ggfsName()); - - checkSame("Default mode", "DefaultMode", rmtNode.id(), rmtAttr.defaultMode(), - locAttr.defaultMode(), rmtAttr.ggfsName()); - - checkSame("Path modes", "PathModes", rmtNode.id(), rmtAttr.pathModes(), - locAttr.pathModes(), rmtAttr.ggfsName()); - - checkSame("Fragmentizer enabled", "FragmentizerEnabled", rmtNode.id(), rmtAttr.fragmentizerEnabled(), - locAttr.fragmentizerEnabled(), rmtAttr.ggfsName()); - } - } - - private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtVal, Object locVal, String ggfsName) - throws IgniteCheckedException { - if (!F.eq(rmtVal, locVal)) - throw new IgniteCheckedException(name + " should be the same on all nodes in grid for GGFS configuration " + - "(fix configuration or set " + - "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property ) [rmtNodeId=" + rmtNodeId + - ", rmt" + propName + "=" + rmtVal + - ", loc" + propName + "=" + locVal + - ", ggfName=" + ggfsName + ']'); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsHelper.java deleted file mode 100644 index c376ecd..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsHelper.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; - -/** - * No-op utils processor adapter. - */ -public class GridNoopGgfsHelper implements GridGgfsHelper { - /** {@inheritDoc} */ - @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean isGgfsBlockKey(Object key) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsHelper.java new file mode 100644 index 0000000..f43be6f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsHelper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; + +/** + * GGFS utility processor adapter. + */ +public interface IgniteFsHelper { + /** + * Pre-process cache configuration. + * + * @param cfg Cache configuration. + */ + public abstract void preProcessCacheConfiguration(CacheConfiguration cfg); + + /** + * Validate cache configuration for GGFS. + * + * @param cfg Cache configuration. + * @throws IgniteCheckedException If validation failed. + */ + public abstract void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException; + + /** + * Check whether object is of type {@code GridGgfsBlockKey} + * + * @param key Key. + * @return {@code True} if GGFS block key. + */ + public abstract boolean isGgfsBlockKey(Object key); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsHelperImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsHelperImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsHelperImpl.java new file mode 100644 index 0000000..7f95036 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsHelperImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.cache.eviction.ggfs.*; + +/** + * GGFS utils processor. + */ +public class IgniteFsHelperImpl implements IgniteFsHelper { + /** {@inheritDoc} */ + @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) { + GridCacheEvictionPolicy evictPlc = cfg.getEvictionPolicy(); + + if (evictPlc instanceof GridCacheGgfsPerBlockLruEvictionPolicy && cfg.getEvictionFilter() == null) + cfg.setEvictionFilter(new GridCacheGgfsEvictionFilter()); + } + + /** {@inheritDoc} */ + @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException { + GridCacheEvictionPolicy evictPlc = cfg.getEvictionPolicy(); + + if (evictPlc != null && evictPlc instanceof GridCacheGgfsPerBlockLruEvictionPolicy) { + GridCacheEvictionFilter evictFilter = cfg.getEvictionFilter(); + + if (evictFilter != null && !(evictFilter instanceof GridCacheGgfsEvictionFilter)) + throw new IgniteCheckedException("Eviction filter cannot be set explicitly when using " + + "GridCacheGgfsPerBlockLruEvictionPolicy:" + cfg.getName()); + } + } + + /** {@inheritDoc} */ + @Override public boolean isGgfsBlockKey(Object key) { + return key instanceof GridGgfsBlockKey; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsNoopHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsNoopHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsNoopHelper.java new file mode 100644 index 0000000..c75d838 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsNoopHelper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; + +/** + * No-op utils processor adapter. + */ +public class IgniteFsNoopHelper implements IgniteFsHelper { + /** {@inheritDoc} */ + @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean isGgfsBlockKey(Object key) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java new file mode 100644 index 0000000..5b84497 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.license.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.fs.IgniteFsMode.*; +import static org.apache.ignite.internal.GridNodeAttributes.*; +import static org.apache.ignite.internal.processors.license.GridLicenseSubsystem.*; + +/** + * Fully operational Ignite file system processor. + */ +public class IgniteFsProcessor extends IgniteFsProcessorAdapter { + /** Null GGFS name. */ + private static final String NULL_NAME = UUID.randomUUID().toString(); + + /** Converts context to GGFS. */ + private static final IgniteClosure<GridGgfsContext,IgniteFs> CTX_TO_GGFS = new C1<GridGgfsContext, IgniteFs>() { + @Override public IgniteFs apply(GridGgfsContext ggfsCtx) { + return ggfsCtx.ggfs(); + } + }; + + /** */ + private final ConcurrentMap<String, GridGgfsContext> ggfsCache = + new ConcurrentHashMap8<>(); + + /** + * @param ctx Kernal context. + */ + public IgniteFsProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + IgniteFsConfiguration[] cfgs = ctx.config().getGgfsConfiguration(); + + assert cfgs != null && cfgs.length > 0; + + // Register GGFS messages. + GridTcpCommunicationMessageFactory.registerCommon(new GridTcpCommunicationMessageProducer() { + @Override + public GridTcpCommunicationMessageAdapter create(byte type) { + switch (type) { + case 65: + return new GridGgfsAckMessage(); + + case 66: + return new GridGgfsBlockKey(); + + case 67: + return new GridGgfsBlocksMessage(); + + case 68: + return new GridGgfsDeleteMessage(); + + case 69: + return new GridGgfsFileAffinityRange(); + + case 70: + return new GridGgfsFragmentizerRequest(); + + case 71: + return new GridGgfsFragmentizerResponse(); + + case 72: + return new GridGgfsSyncMessage(); + + default: + assert false : "Invalid GGFS message type."; + + return null; + } + } + }, 65, 66, 67, 68, 69,70, 71, 72); + + // Register HDFS edition usage with license manager. + GridLicenseUseRegistry.onUsage(HADOOP, getClass()); + + validateLocalGgfsConfigurations(cfgs); + + // Start GGFS instances. + for (IgniteFsConfiguration cfg : cfgs) { + GridGgfsContext ggfsCtx = new GridGgfsContext( + ctx, + new IgniteFsConfiguration(cfg), + new GridGgfsMetaManager(), + new GridGgfsDataManager(), + new GridGgfsServerManager(), + new GridGgfsFragmentizerManager()); + + // Start managers first. + for (GridGgfsManager mgr : ggfsCtx.managers()) + mgr.start(ggfsCtx); + + ggfsCache.put(maskName(cfg.getName()), ggfsCtx); + } + + if (log.isDebugEnabled()) + log.debug("GGFS processor started."); + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + if (!getBoolean(GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) + checkGgfsOnRemoteNode(n); + } + + for (GridGgfsContext ggfsCtx : ggfsCache.values()) + for (GridGgfsManager mgr : ggfsCtx.managers()) + mgr.onKernalStart(); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // Stop GGFS instances. + for (GridGgfsContext ggfsCtx : ggfsCache.values()) { + if (log.isDebugEnabled()) + log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); + + List<GridGgfsManager> mgrs = ggfsCtx.managers(); + + for (ListIterator<GridGgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridGgfsManager mgr = it.previous(); + + mgr.stop(cancel); + } + + ggfsCtx.ggfs().stop(); + } + + ggfsCache.clear(); + + if (log.isDebugEnabled()) + log.debug("GGFS processor stopped."); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + for (GridGgfsContext ggfsCtx : ggfsCache.values()) { + if (log.isDebugEnabled()) + log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); + + List<GridGgfsManager> mgrs = ggfsCtx.managers(); + + for (ListIterator<GridGgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridGgfsManager mgr = it.previous(); + + mgr.onKernalStop(cancel); + } + } + + if (log.isDebugEnabled()) + log.debug("Finished executing GGFS processor onKernalStop() callback."); + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> ggfsCacheSize: " + ggfsCache.size()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Collection<IgniteFs> ggfss() { + return F.viewReadOnly(ggfsCache.values(), CTX_TO_GGFS); + } + + /** {@inheritDoc} */ + @Override @Nullable public IgniteFs ggfs(@Nullable String name) { + GridGgfsContext ggfsCtx = ggfsCache.get(maskName(name)); + + return ggfsCtx == null ? null : ggfsCtx.ggfs(); + } + + /** {@inheritDoc} */ + @Override @Nullable public Collection<GridIpcServerEndpoint> endpoints(@Nullable String name) { + GridGgfsContext ggfsCtx = ggfsCache.get(maskName(name)); + + return ggfsCtx == null ? Collections.<GridIpcServerEndpoint>emptyList() : ggfsCtx.server().endpoints(); + } + + /** {@inheritDoc} */ + @Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, + long start, long length, IgniteFsRecordResolver recRslv) { + return new GridGgfsJobImpl(job, ggfsName, path, start, length, recRslv); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException { + super.addAttributes(attrs); + + IgniteConfiguration gridCfg = ctx.config(); + + // Node doesn't have GGFS if it: + // is daemon; + // doesn't have configured GGFS; + // doesn't have configured caches. + if (gridCfg.isDaemon() || F.isEmpty(gridCfg.getGgfsConfiguration()) || + F.isEmpty(gridCfg.getCacheConfiguration())) + return; + + final Map<String, CacheConfiguration> cacheCfgs = new HashMap<>(); + + F.forEach(gridCfg.getCacheConfiguration(), new CI1<CacheConfiguration>() { + @Override public void apply(CacheConfiguration c) { + cacheCfgs.put(c.getName(), c); + } + }); + + Collection<GridGgfsAttributes> attrVals = new ArrayList<>(); + + assert gridCfg.getGgfsConfiguration() != null; + + for (IgniteFsConfiguration ggfsCfg : gridCfg.getGgfsConfiguration()) { + CacheConfiguration cacheCfg = cacheCfgs.get(ggfsCfg.getDataCacheName()); + + if (cacheCfg == null) + continue; // No cache for the given GGFS configuration. + + GridCacheAffinityKeyMapper affMapper = cacheCfg.getAffinityMapper(); + + if (!(affMapper instanceof IgniteFsGroupDataBlocksKeyMapper)) + // Do not create GGFS attributes for such a node nor throw error about invalid configuration. + // Configuration will be validated later, while starting GridGgfsProcessor. + continue; + + attrVals.add(new GridGgfsAttributes( + ggfsCfg.getName(), + ggfsCfg.getBlockSize(), + ((IgniteFsGroupDataBlocksKeyMapper)affMapper).groupSize(), + ggfsCfg.getMetaCacheName(), + ggfsCfg.getDataCacheName(), + ggfsCfg.getDefaultMode(), + ggfsCfg.getPathModes(), + ggfsCfg.isFragmentizerEnabled())); + } + + attrs.put(ATTR_GGFS, attrVals.toArray(new GridGgfsAttributes[attrVals.size()])); + } + + /** + * @param name Cache name. + * @return Masked name accounting for {@code nulls}. + */ + private String maskName(@Nullable String name) { + return name == null ? NULL_NAME : name; + } + + /** + * Validates local GGFS configurations. Compares attributes only for GGFSes with same name. + * @param cfgs GGFS configurations + * @throws IgniteCheckedException If any of GGFS configurations is invalid. + */ + private void validateLocalGgfsConfigurations(IgniteFsConfiguration[] cfgs) throws IgniteCheckedException { + Collection<String> cfgNames = new HashSet<>(); + + for (IgniteFsConfiguration cfg : cfgs) { + String name = cfg.getName(); + + if (cfgNames.contains(name)) + throw new IgniteCheckedException("Duplicate GGFS name found (check configuration and " + + "assign unique name to each): " + name); + + GridCacheAdapter<Object, Object> dataCache = ctx.cache().internalCache(cfg.getDataCacheName()); + + if (dataCache == null) + throw new IgniteCheckedException("Data cache is not configured locally for GGFS: " + cfg); + + if (dataCache.configuration().isQueryIndexEnabled()) + throw new IgniteCheckedException("GGFS data cache cannot start with enabled query indexing."); + + GridCache<Object, Object> metaCache = ctx.cache().cache(cfg.getMetaCacheName()); + + if (metaCache == null) + throw new IgniteCheckedException("Metadata cache is not configured locally for GGFS: " + cfg); + + if (metaCache.configuration().isQueryIndexEnabled()) + throw new IgniteCheckedException("GGFS metadata cache cannot start with enabled query indexing."); + + if (F.eq(cfg.getDataCacheName(), cfg.getMetaCacheName())) + throw new IgniteCheckedException("Cannot use same cache as both data and meta cache: " + cfg.getName()); + + if (!(dataCache.configuration().getAffinityMapper() instanceof IgniteFsGroupDataBlocksKeyMapper)) + throw new IgniteCheckedException("Invalid GGFS data cache configuration (key affinity mapper class should be " + + IgniteFsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg); + + long maxSpaceSize = cfg.getMaxSpaceSize(); + + if (maxSpaceSize > 0) { + // Max space validation. + long maxHeapSize = Runtime.getRuntime().maxMemory(); + long offHeapSize = dataCache.configuration().getOffHeapMaxMemory(); + + if (offHeapSize < 0 && maxSpaceSize > maxHeapSize) + // Offheap is disabled. + throw new IgniteCheckedException("Maximum GGFS space size cannot be greater that size of available heap " + + "memory [maxHeapSize=" + maxHeapSize + ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); + else if (offHeapSize > 0 && maxSpaceSize > maxHeapSize + offHeapSize) + // Offheap is enabled, but limited. + throw new IgniteCheckedException("Maximum GGFS space size cannot be greater than size of available heap " + + "memory and offheap storage [maxHeapSize=" + maxHeapSize + ", offHeapSize=" + offHeapSize + + ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); + } + + if (dataCache.configuration().getCacheMode() == PARTITIONED) { + int backups = dataCache.configuration().getBackups(); + + if (backups != 0) + throw new IgniteCheckedException("GGFS data cache cannot be used with backups (set backup count " + + "to 0 and restart the grid): " + cfg.getDataCacheName()); + } + + if (cfg.getMaxSpaceSize() == 0 && dataCache.configuration().getMemoryMode() == OFFHEAP_VALUES) + U.warn(log, "GGFS max space size is not specified but data cache values are stored off-heap (max " + + "space will be limited to 80% of max JVM heap size): " + cfg.getName()); + + boolean secondary = cfg.getDefaultMode() == PROXY; + + if (cfg.getPathModes() != null) { + for (Map.Entry<String, IgniteFsMode> mode : cfg.getPathModes().entrySet()) { + if (mode.getValue() == PROXY) + secondary = true; + } + } + + if (secondary) { + // When working in any mode except of primary, secondary FS config must be provided. + assertParameter(cfg.getSecondaryFileSystem() != null, + "secondaryFileSystem cannot be null when mode is SECONDARY"); + } + + cfgNames.add(name); + } + } + + /** + * Check GGFS config on remote node. + * + * @param rmtNode Remote node. + * @throws IgniteCheckedException If check failed. + */ + private void checkGgfsOnRemoteNode(ClusterNode rmtNode) throws IgniteCheckedException { + GridGgfsAttributes[] locAttrs = ctx.discovery().localNode().attribute(GridNodeAttributes.ATTR_GGFS); + GridGgfsAttributes[] rmtAttrs = rmtNode.attribute(GridNodeAttributes.ATTR_GGFS); + + if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs)) + return; + + assert rmtAttrs != null && locAttrs != null; + + for (GridGgfsAttributes rmtAttr : rmtAttrs) + for (GridGgfsAttributes locAttr : locAttrs) { + // Checking the use of different caches on the different GGFSes. + if (!F.eq(rmtAttr.ggfsName(), locAttr.ggfsName())) { + if (F.eq(rmtAttr.metaCacheName(), locAttr.metaCacheName())) + throw new IgniteCheckedException("Meta cache names should be different for different GGFS instances " + + "configuration (fix configuration or set " + + "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + + "property) [metaCacheName=" + rmtAttr.metaCacheName() + + ", locNodeId=" + ctx.localNodeId() + + ", rmtNodeId=" + rmtNode.id() + + ", locGgfsName=" + locAttr.ggfsName() + + ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); + + if (F.eq(rmtAttr.dataCacheName(), locAttr.dataCacheName())) + throw new IgniteCheckedException("Data cache names should be different for different GGFS instances " + + "configuration (fix configuration or set " + + "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + + "property)[dataCacheName=" + rmtAttr.dataCacheName() + + ", locNodeId=" + ctx.localNodeId() + + ", rmtNodeId=" + rmtNode.id() + + ", locGgfsName=" + locAttr.ggfsName() + + ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); + + continue; + } + + // Compare other attributes only for GGFSes with same name. + checkSame("Data block size", "BlockSize", rmtNode.id(), rmtAttr.blockSize(), + locAttr.blockSize(), rmtAttr.ggfsName()); + + checkSame("Affinity mapper group size", "GrpSize", rmtNode.id(), rmtAttr.groupSize(), + locAttr.groupSize(), rmtAttr.ggfsName()); + + checkSame("Meta cache name", "MetaCacheName", rmtNode.id(), rmtAttr.metaCacheName(), + locAttr.metaCacheName(), rmtAttr.ggfsName()); + + checkSame("Data cache name", "DataCacheName", rmtNode.id(), rmtAttr.dataCacheName(), + locAttr.dataCacheName(), rmtAttr.ggfsName()); + + checkSame("Default mode", "DefaultMode", rmtNode.id(), rmtAttr.defaultMode(), + locAttr.defaultMode(), rmtAttr.ggfsName()); + + checkSame("Path modes", "PathModes", rmtNode.id(), rmtAttr.pathModes(), + locAttr.pathModes(), rmtAttr.ggfsName()); + + checkSame("Fragmentizer enabled", "FragmentizerEnabled", rmtNode.id(), rmtAttr.fragmentizerEnabled(), + locAttr.fragmentizerEnabled(), rmtAttr.ggfsName()); + } + } + + private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtVal, Object locVal, String ggfsName) + throws IgniteCheckedException { + if (!F.eq(rmtVal, locVal)) + throw new IgniteCheckedException(name + " should be the same on all nodes in grid for GGFS configuration " + + "(fix configuration or set " + + "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + + "property ) [rmtNodeId=" + rmtNodeId + + ", rmt" + propName + "=" + rmtVal + + ", loc" + propName + "=" + locVal + + ", ggfName=" + ggfsName + ']'); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/spring/GridSpringProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/spring/GridSpringProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/spring/GridSpringProcessor.java deleted file mode 100644 index 1c8c195..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/spring/GridSpringProcessor.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.spring; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.processors.resource.*; - -import java.net.*; -import java.util.*; - -/** - * Spring processor which can parse Spring configuration files, interface was introduced to avoid mandatory - * runtime dependency on Spring framework. - */ -public interface GridSpringProcessor { - /** - * Loads all grid configurations specified within given configuration file. - * <p> - * Usually Spring XML configuration file will contain only one Grid definition. Note that - * Grid configuration bean(s) is retrieved form configuration file by type, so the name of - * the Grid configuration bean is ignored. - * - * @param cfgUrl Configuration file path or URL. This cannot be {@code null}. - * @param excludedProps Properties to exclude. - * @return Tuple containing all loaded configurations and Spring context used to load them. - * @throws IgniteCheckedException If grid could not be started or configuration - * read. This exception will be thrown also if grid with given name has already - * been started or Spring XML configuration file is invalid. - */ - public IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> loadConfigurations( - URL cfgUrl, String... excludedProps) throws IgniteCheckedException; - - /** - * Loads bean instances that match the given types from given configuration file. - * - * @param cfgUrl Configuration file path or URL. This cannot be {@code null}. - * @param beanClasses Beans classes. - * @return Bean class -> loaded bean instance map, if configuration does not contain bean with required type the - * map value is {@code null}. - * @throws IgniteCheckedException If failed to load configuration. - */ - public Map<Class<?>, Object> loadBeans(URL cfgUrl, Class<?>... beanClasses) throws IgniteCheckedException; - - /** - * Gets user version for given class loader by checking - * {@code META-INF/gridgain.xml} file for {@code userVersion} attribute. If - * {@code gridgain.xml} file is not found, or user version is not specified there, - * then default version (empty string) is returned. - * - * @param ldr Class loader. - * @param log Logger. - * @return User version for given class loader or empty string if no version - * was explicitly specified. - */ - public String userVersion(ClassLoader ldr, IgniteLogger log); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/main/java/org/apache/ignite/internal/processors/spring/IgniteSpringProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/spring/IgniteSpringProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/spring/IgniteSpringProcessor.java new file mode 100644 index 0000000..589921e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/spring/IgniteSpringProcessor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.spring; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.resource.*; + +import java.net.*; +import java.util.*; + +/** + * Spring processor which can parse Spring configuration files, interface was introduced to avoid mandatory + * runtime dependency on Spring framework. + */ +public interface IgniteSpringProcessor { + /** + * Loads all grid configurations specified within given configuration file. + * <p> + * Usually Spring XML configuration file will contain only one Grid definition. Note that + * Grid configuration bean(s) is retrieved form configuration file by type, so the name of + * the Grid configuration bean is ignored. + * + * @param cfgUrl Configuration file path or URL. This cannot be {@code null}. + * @param excludedProps Properties to exclude. + * @return Tuple containing all loaded configurations and Spring context used to load them. + * @throws IgniteCheckedException If grid could not be started or configuration + * read. This exception will be thrown also if grid with given name has already + * been started or Spring XML configuration file is invalid. + */ + public IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> loadConfigurations( + URL cfgUrl, String... excludedProps) throws IgniteCheckedException; + + /** + * Loads bean instances that match the given types from given configuration file. + * + * @param cfgUrl Configuration file path or URL. This cannot be {@code null}. + * @param beanClasses Beans classes. + * @return Bean class -> loaded bean instance map, if configuration does not contain bean with required type the + * map value is {@code null}. + * @throws IgniteCheckedException If failed to load configuration. + */ + public Map<Class<?>, Object> loadBeans(URL cfgUrl, Class<?>... beanClasses) throws IgniteCheckedException; + + /** + * Gets user version for given class loader by checking + * {@code META-INF/gridgain.xml} file for {@code userVersion} attribute. If + * {@code gridgain.xml} file is not found, or user version is not specified there, + * then default version (empty string) is returned. + * + * @param ldr Class loader. + * @param log Logger. + * @return User version for given class loader or empty string if no version + * was explicitly specified. + */ + public String userVersion(ClassLoader ldr, IgniteLogger log); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorSelfTest.java index 5515ede..e8fd62c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorSelfTest.java @@ -42,7 +42,7 @@ import static org.apache.ignite.cache.GridCacheAtomicityMode.*; import static org.apache.ignite.cache.GridCacheMode.*; /** - * Tests for {@link GridGgfsProcessor}. + * Tests for {@link IgniteFsProcessor}. */ public class GridGgfsProcessorSelfTest extends GridGgfsCommonAbstractTest { /** Test IP finder. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorValidationSelfTest.java index 6908c52..b536b93 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorValidationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorValidationSelfTest.java @@ -35,12 +35,12 @@ import static org.apache.ignite.cache.GridCacheAtomicityMode.*; import static org.apache.ignite.cache.GridCacheMode.*; /** - * Tests for node validation logic in {@link GridGgfsProcessor}. + * Tests for node validation logic in {@link IgniteFsProcessor}. * <p> * Tests starting with "testLocal" are checking - * {@link GridGgfsProcessor#validateLocalGgfsConfigurations(org.apache.ignite.fs.IgniteFsConfiguration[])}. + * {@link IgniteFsProcessor#validateLocalGgfsConfigurations(org.apache.ignite.fs.IgniteFsConfiguration[])}. * <p> - * Tests starting with "testRemote" are checking {@link GridGgfsProcessor#checkGgfsOnRemoteNode(org.apache.ignite.cluster.ClusterNode)}. + * Tests starting with "testRemote" are checking {@link IgniteFsProcessor#checkGgfsOnRemoteNode(org.apache.ignite.cluster.ClusterNode)}. */ public class GridGgfsProcessorValidationSelfTest extends GridGgfsCommonAbstractTest { /** IP finder. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0204d759/modules/spring/src/main/java/org/apache/ignite/internal/processors/spring/GridSpringProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/processors/spring/GridSpringProcessorImpl.java b/modules/spring/src/main/java/org/apache/ignite/internal/processors/spring/GridSpringProcessorImpl.java deleted file mode 100644 index 532e742..0000000 --- a/modules/spring/src/main/java/org/apache/ignite/internal/processors/spring/GridSpringProcessorImpl.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.spring; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.processors.resource.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; -import org.springframework.beans.*; -import org.springframework.beans.factory.*; -import org.springframework.beans.factory.config.*; -import org.springframework.beans.factory.support.*; -import org.springframework.beans.factory.xml.*; -import org.springframework.context.*; -import org.springframework.context.support.*; -import org.springframework.core.io.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Spring configuration processor. - */ -public class GridSpringProcessorImpl implements GridSpringProcessor { - /** Path to {@code gridgain.xml} file. */ - public static final String GRIDGAIN_XML_PATH = "META-INF/gridgain.xml"; - - /** System class loader user version. */ - private static final AtomicReference<String> SYS_LDR_VER = new AtomicReference<>(null); - - /** - * Try to execute LogFactory.getFactory().setAttribute("org.apache.commons.logging.Log", null) - * to turn off default logging for Spring Framework. - */ - static { - Class<?> logFactoryCls = null; - - try { - logFactoryCls = Class.forName("org.apache.commons.logging.LogFactory"); - } - catch (ClassNotFoundException ignored) { - // No-op. - } - - if (logFactoryCls != null) { - try { - Object factory = logFactoryCls.getMethod("getFactory").invoke(null); - - factory.getClass().getMethod("setAttribute", String.class, Object.class). - invoke(factory, "org.apache.commons.logging.Log", null); - } - catch (Exception ignored) { - // No-op. - } - } - } - - /** {@inheritDoc} */ - @Override public IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> loadConfigurations( - URL cfgUrl, String... excludedProps) throws IgniteCheckedException { - ApplicationContext springCtx; - - try { - springCtx = applicationContext(cfgUrl, excludedProps); - } - catch (BeansException e) { - if (X.hasCause(e, ClassNotFoundException.class)) - throw new IgniteCheckedException("Failed to instantiate Spring XML application context " + - "(make sure all classes used in Spring configuration are present at CLASSPATH) " + - "[springUrl=" + cfgUrl + ']', e); - else - throw new IgniteCheckedException("Failed to instantiate Spring XML application context [springUrl=" + - cfgUrl + ", err=" + e.getMessage() + ']', e); - } - - Map<String, IgniteConfiguration> cfgMap; - - try { - cfgMap = springCtx.getBeansOfType(IgniteConfiguration.class); - } - catch (BeansException e) { - throw new IgniteCheckedException("Failed to instantiate bean [type=" + IgniteConfiguration.class + ", err=" + - e.getMessage() + ']', e); - } - - if (cfgMap == null || cfgMap.isEmpty()) - throw new IgniteCheckedException("Failed to find grid configuration in: " + cfgUrl); - - return F.t(cfgMap.values(), new GridSpringResourceContextImpl(springCtx)); - } - - /** {@inheritDoc} */ - @Override public Map<Class<?>, Object> loadBeans(URL cfgUrl, Class<?>... beanClasses) throws IgniteCheckedException { - assert beanClasses.length > 0; - - GenericApplicationContext springCtx; - - try { - springCtx = new GenericApplicationContext(); - - new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl)); - - springCtx.refresh(); - } - catch (BeansException e) { - if (X.hasCause(e, ClassNotFoundException.class)) - throw new IgniteCheckedException("Failed to instantiate Spring XML application context " + - "(make sure all classes used in Spring configuration are present at CLASSPATH) " + - "[springUrl=" + cfgUrl + ']', e); - else - throw new IgniteCheckedException("Failed to instantiate Spring XML application context [springUrl=" + - cfgUrl + ", err=" + e.getMessage() + ']', e); - } - - Map<Class<?>, Object> beans = new HashMap<>(); - - for (Class<?> cls : beanClasses) - beans.put(cls, bean(springCtx, cls)); - - return beans; - } - - /** {@inheritDoc} */ - @Override public String userVersion(ClassLoader ldr, IgniteLogger log) { - assert ldr != null; - assert log != null; - - // For system class loader return cached version. - if (ldr == U.gridClassLoader() && SYS_LDR_VER.get() != null) - return SYS_LDR_VER.get(); - - String usrVer = U.DFLT_USER_VERSION; - - InputStream in = ldr.getResourceAsStream(GRIDGAIN_XML_PATH); - - if (in != null) { - // Note: use ByteArrayResource instead of InputStreamResource because - // InputStreamResource doesn't work. - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - try { - U.copy(in, out); - - DefaultListableBeanFactory factory = new DefaultListableBeanFactory(); - - XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(factory); - - reader.loadBeanDefinitions(new ByteArrayResource(out.toByteArray())); - - usrVer = (String)factory.getBean("userVersion"); - - usrVer = usrVer == null ? U.DFLT_USER_VERSION : usrVer.trim(); - } - catch (NoSuchBeanDefinitionException ignored) { - if (log.isInfoEnabled()) - log.info("User version is not explicitly defined (will use default version) [file=" + - GRIDGAIN_XML_PATH + ", clsLdr=" + ldr + ']'); - - usrVer = U.DFLT_USER_VERSION; - } - catch (BeansException e) { - U.error(log, "Failed to parse Spring XML file (will use default user version) [file=" + - GRIDGAIN_XML_PATH + ", clsLdr=" + ldr + ']', e); - - usrVer = U.DFLT_USER_VERSION; - } - catch (IOException e) { - U.error(log, "Failed to read Spring XML file (will use default user version) [file=" + - GRIDGAIN_XML_PATH + ", clsLdr=" + ldr + ']', e); - - usrVer = U.DFLT_USER_VERSION; - } - finally { - U.close(out, log); - } - } - - // For system class loader return cached version. - if (ldr == U.gridClassLoader()) - SYS_LDR_VER.compareAndSet(null, usrVer); - - return usrVer; - } - - /** - * Gets bean configuration. - * - * @param ctx Spring context. - * @param beanCls Bean class. - * @return Spring bean. - */ - @Nullable private static <T> T bean(ListableBeanFactory ctx, Class<T> beanCls) { - Map.Entry<String, T> entry = F.firstEntry(ctx.getBeansOfType(beanCls)); - - return entry == null ? null : entry.getValue(); - } - - /** - * Creates Spring application context. Optionally excluded properties can be specified, - * it means that if such a property is found in {@link org.apache.ignite.configuration.IgniteConfiguration} - * then it is removed before the bean is instantiated. - * For example, {@code streamerConfiguration} can be excluded from the configs that Visor uses. - * - * @param cfgUrl Resource where config file is located. - * @param excludedProps Properties to be excluded. - * @return Spring application context. - */ - public static ApplicationContext applicationContext(URL cfgUrl, final String... excludedProps) { - GenericApplicationContext springCtx = new GenericApplicationContext(); - - BeanFactoryPostProcessor postProc = new BeanFactoryPostProcessor() { - @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) - throws BeansException { - for (String beanName : beanFactory.getBeanDefinitionNames()) { - BeanDefinition def = beanFactory.getBeanDefinition(beanName); - - if (def.getBeanClassName() != null) { - try { - Class.forName(def.getBeanClassName()); - } - catch (ClassNotFoundException ignored) { - ((BeanDefinitionRegistry)beanFactory).removeBeanDefinition(beanName); - - continue; - } - } - - MutablePropertyValues vals = def.getPropertyValues(); - - for (PropertyValue val : new ArrayList<>(vals.getPropertyValueList())) { - for (String excludedProp : excludedProps) { - if (val.getName().equals(excludedProp)) - vals.removePropertyValue(val); - } - } - } - } - }; - - springCtx.addBeanFactoryPostProcessor(postProc); - - new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl)); - - springCtx.refresh(); - - return springCtx; - } -}