[IGNITE-349]: unit test added.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/66b00a1b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/66b00a1b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/66b00a1b Branch: refs/heads/ignite-349 Commit: 66b00a1bc2f1d4775ab5c369a1df61d6d8922afb Parents: 83dabe6 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Wed Mar 4 16:32:31 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Wed Mar 4 16:32:31 2015 +0300 ---------------------------------------------------------------------- .../igfs/hadoop/v1/IgfsHadoopFileSystem.java | 8 +- .../igfs/hadoop/v2/IgfsHadoopFileSystem.java | 8 +- .../hadoop/SecondaryFileSystemProvider.java | 4 +- .../SecondaryFileSystemConfigurationTest.java | 536 +++++++++++++++++++ 4 files changed, 551 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66b00a1b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java index 2f5278e..280a28b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java @@ -272,11 +272,15 @@ public class IgfsHadoopFileSystem extends FileSystem { boolean initSecondary = paths.defaultMode() == PROXY; - if (paths.pathModes() != null && !paths.pathModes().isEmpty()) { + if (!initSecondary && paths.pathModes() != null && !paths.pathModes().isEmpty()) { for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) { IgfsMode mode = pathMode.getValue(); - initSecondary |= mode == PROXY; + if (mode == PROXY) { + initSecondary = true; + + break; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66b00a1b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java index 81585ff..340b22a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java @@ -267,11 +267,15 @@ public class IgfsHadoopFileSystem extends AbstractFileSystem implements Closeabl boolean initSecondary = paths.defaultMode() == PROXY; - if (paths.pathModes() != null) { + if (!initSecondary && paths.pathModes() != null) { for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) { IgfsMode mode = pathMode.getValue(); - initSecondary |= mode == PROXY; + if (mode == PROXY) { + initSecondary = true; + + break; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66b00a1b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index 722cc6c..5c9ec83 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -41,7 +42,8 @@ public class SecondaryFileSystemProvider { * * @param secUri the secondary Fs URI (optional). If not given explicitly, it must be specified as "fs.defaultFS" * property in the provided configuration. - * @param secConfPath the secondary Fs config URL (optional) + * @param secConfPath the secondary Fs path (file path on the local file system, optional). + * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved. * @throws IOException */ public SecondaryFileSystemProvider(final @Nullable String secUri, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66b00a1b/modules/hadoop/src/test/java/org/apache/ignite/igfs/SecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/SecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/SecondaryFileSystemConfigurationTest.java new file mode 100644 index 0000000..f75ca2a --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/SecondaryFileSystemConfigurationTest.java @@ -0,0 +1,536 @@ +package org.apache.ignite.igfs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.hadoop.*; +import org.apache.ignite.igfs.hadoop.v1.*; +import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.igfs.IgfsMode.*; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; + +/** + * Tests secondary file system configuration. + */ +public class SecondaryFileSystemConfigurationTest extends IgfsCommonAbstractTest { + /** IGFS scheme */ + private static final String IGFS_SCHEME = "igfs"; + + /** Primary file system authority. */ + private static final String PRIMARY_AUTHORITY = "igfs:grid0@"; + + /** Autogenerated secondary file system configuration path. */ + private static final String PRIMARY_CFG_PATH = "/work/core-site-primary-test.xml"; + + /** Secondary file system authority. */ + private static final String SECONDARY_AUTHORITY = "igfs_secondary:grid_secondary@127.0.0.1:11500"; + + /** Autogenerated secondary file system configuration path. */ + private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; + + /** Secondary endpoint configuration. */ + protected static final Map<String, String> SECONDARY_ENDPOINT_CFG = new HashMap<String, String>() {{ + put("type", "tcp"); + put("port", "11500"); + }}; + + /** Group size. */ + public static final int GRP_SIZE = 128; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Primary file system URI. */ + protected URI primaryFsUri; + + /** Primary file system. */ + private FileSystem primaryFs; + + /** Full path of primary Fs configuration */ + private String primaryConfFullPath; + + /** Input primary Fs uri */ + private String primaryFsUriStr; + + /** Input URI scheme for configuration */ + private String primaryCfgScheme; + + /** Input URI authority for configuration */ + private String primaryCfgAuthority; + + /** if to pass configuration */ + private boolean passPrimaryConfiguration; + + /** Full path of s Fs configuration */ + private String secondaryConfFullPath; + + /** /Input URI scheme for configuration */ + private String secondaryFsUriStr; + + /** Input URI scheme for configuration */ + private String secondaryCfgScheme; + + /** Input URI authority for configuration */ + private String secondaryCfgAuthority; + + /** if to pass configuration */ + private boolean passSecondaryConfiguration; + + /** Default IGFS mode. */ + protected final IgfsMode mode; + + /** Skip embedded mode flag. */ + private final boolean skipEmbed; + + /** Skip local shmem flag. */ + private final boolean skipLocShmem; + + /** + * Constructor. + * + * @param mode Default IGFS mode. + * @param skipEmbed Whether to skip embedded mode. + * @param skipLocShmem Whether to skip local shmem mode. + */ + protected SecondaryFileSystemConfigurationTest(IgfsMode mode, boolean skipEmbed, boolean skipLocShmem) { + this.mode = mode; + this.skipEmbed = skipEmbed; + this.skipLocShmem = skipLocShmem; + } + + /** + * Default constructor. + */ + public SecondaryFileSystemConfigurationTest() { + this(PROXY, true, false); + } + + /** + * Executes before each test. + * @throws Exception + */ + private void before() throws Exception { + initSecondary(); + + if (passPrimaryConfiguration) { + Configuration primaryFsCfg = configuration(primaryCfgScheme, primaryCfgAuthority, skipEmbed, skipLocShmem); + + primaryConfFullPath = writeConfiguration(primaryFsCfg, PRIMARY_CFG_PATH); + } else + primaryConfFullPath = null; + + SecondaryFileSystemProvider provider = + new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath); + + primaryFs = provider.createFileSystem(); + + primaryFsUri = provider.uri(); + } + + /** + * Executes after each test. + * @throws Exception + */ + private void after() throws Exception { + if (primaryFs != null) { + try { + primaryFs.delete(new Path("/"), true); + } + catch (Exception ignore) { + // No-op. + } + + U.closeQuiet(primaryFs); + } + + G.stopAll(true); + + delete(primaryConfFullPath); + delete(secondaryConfFullPath); + } + + /** + * Utility method to delete file. + * + * @param file the file path to delete. + */ + private static void delete(String file) { + if (file != null) { + new File(file).delete(); + + assertFalse(new File(file).exists()); + } + } + + /** + * Initialize underlying secondary filesystem. + * + * @throws Exception + */ + private void initSecondary() throws Exception { + if (passSecondaryConfiguration) { + Configuration secondaryConf = configuration(secondaryCfgScheme, secondaryCfgAuthority, true, true); + + secondaryConf.setInt("fs.igfs.block.size", 1024); + + secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH); + } else + secondaryConfFullPath = null; + + startNodes(); + } + + /** + * Starts the nodes for this test. + * + * @throws Exception If failed. + */ + private void startNodes() throws Exception { + if (mode != PRIMARY) + startSecondary(); + + startGrids(4); + } + + /** + * Starts secondary IGFS + */ + private void startSecondary() { + IgfsConfiguration igfsCfg = new IgfsConfiguration(); + + igfsCfg.setDataCacheName("partitioned"); + igfsCfg.setMetaCacheName("replicated"); + igfsCfg.setName("igfs_secondary"); + igfsCfg.setIpcEndpointConfiguration(SECONDARY_ENDPOINT_CFG); + igfsCfg.setBlockSize(512 * 1024); + igfsCfg.setPrefetchBlocks(1); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); + cacheCfg.setBackups(0); + cacheCfg.setQueryIndexEnabled(false); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("grid_secondary"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); + cfg.setIgfsConfiguration(igfsCfg); + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setCommunicationSpi(communicationSpi()); + + G.start(cfg); + } + + /** + * Get primary IPC endpoint configuration. + * + * @param gridName Grid name. + * @return IPC primary endpoint configuration. + */ + protected Map<String, String> primaryIpcEndpointConfiguration(final String gridName) { + return new HashMap<String, String>() {{ + put("type", "tcp"); + put("port", String.valueOf(DFLT_IPC_PORT + getTestGridIndex(gridName))); + }}; + } + + /** {@inheritDoc} */ + @Override public String getTestGridName() { + return "grid"; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(cacheConfiguration()); + cfg.setIgfsConfiguration(igfsConfiguration(gridName)); + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + cfg.setCommunicationSpi(communicationSpi()); + + return cfg; + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + protected CacheConfiguration[] cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); + cacheCfg.setBackups(0); + cacheCfg.setQueryIndexEnabled(false); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + return new CacheConfiguration[] {metaCacheCfg, cacheCfg}; + } + + /** + * Gets IGFS configuration. + * + * @param gridName Grid name. + * @return IGFS configuration. + */ + protected IgfsConfiguration igfsConfiguration(String gridName) throws IgniteCheckedException { + IgfsConfiguration cfg = new IgfsConfiguration(); + + cfg.setDataCacheName("partitioned"); + cfg.setMetaCacheName("replicated"); + cfg.setName("igfs"); + cfg.setPrefetchBlocks(1); + cfg.setDefaultMode(mode); + + if (mode != PRIMARY) + cfg.setSecondaryFileSystem( + new IgfsHadoopFileSystemWrapper(secondaryFsUriStr, secondaryConfFullPath)); + + cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); + + cfg.setManagementPort(-1); + cfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. + + return cfg; + } + + /** @return Communication SPI. */ + private CommunicationSpi communicationSpi() { + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + return commSpi; + } + + /** @throws Exception If failed. */ + @SuppressWarnings("deprecation") + private void createBaseTestImpl() throws Exception { + Path fsHome = new Path(primaryFsUri); + Path dir = new Path(fsHome, "/someDir1/someDir2/someDir3"); + Path file = new Path(dir, "someFile"); + + assertPathDoesNotExist(primaryFs, file); + + FsPermission fsPerm = new FsPermission((short)644); + + FSDataOutputStream os = primaryFs.create(file, fsPerm, false, 1, (short)1, 1L, null); + + // Try to write something in file. + os.write("abc".getBytes()); + + os.close(); + + // Check file status. + FileStatus fileStatus = primaryFs.getFileStatus(file); + + assertFalse(fileStatus.isDir()); + assertEquals(file, fileStatus.getPath()); + assertEquals(fsPerm, fileStatus.getPermission()); + } + + /** + * Case #SecondaryFileSystemProvider(null, path) + * + * @throws Exception On failure. + */ + public void testFsConfigurationOnly() throws Exception { + primaryCfgScheme = IGFS_SCHEME; + primaryCfgAuthority = PRIMARY_AUTHORITY; + passPrimaryConfiguration = true; + primaryFsUriStr = null; + + // wrong secondary URI in the configuration: + secondaryCfgScheme = IGFS_SCHEME; + secondaryCfgAuthority = SECONDARY_AUTHORITY; + passSecondaryConfiguration = true; + secondaryFsUriStr = null; + + before(); + try { + createBaseTestImpl(); + } finally { + after(); + } + } + + /** + * Case #SecondaryFileSystemProvider(uri, path), when 'uri' parameter overrides + * the Fs uri set in the configuration. + * + * @throws Exception On failure. + */ + public void testFsUriOverridesUriInConfiguration() throws Exception { + // wrong primary URI in the configuration: + primaryCfgScheme = "foo"; + primaryCfgAuthority = "moo:zoo@bee"; + passPrimaryConfiguration = true; + primaryFsUriStr = mkUri(IGFS_SCHEME, PRIMARY_AUTHORITY); + + // wrong secondary URI in the configuration: + secondaryCfgScheme = "foo"; + secondaryCfgAuthority = "moo:zoo@bee"; + passSecondaryConfiguration = true; + secondaryFsUriStr = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY); + + before(); + try { + createBaseTestImpl(); + } finally { + after(); + } + } + + /** + * Create configuration for test. + * + * @param skipEmbed Whether to skip embedded mode. + * @param skipLocShmem Whether to skip local shmem mode. + * @return Configuration. + */ + private static Configuration configuration(String scheme, String authority, boolean skipEmbed, boolean skipLocShmem) { + final Configuration cfg = new Configuration(); + + if (scheme != null && authority != null) + cfg.set("fs.defaultFS", scheme + "://" + authority + "/"); + + setImplClasses(cfg); + + if (authority != null) { + if (skipEmbed) + cfg.setBoolean(String.format(IgfsHadoopUtils.PARAM_IGFS_ENDPOINT_NO_EMBED, authority), true); + + if (skipLocShmem) + cfg.setBoolean(String.format(IgfsHadoopUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority), true); + } + + return cfg; + } + + /** + * Sets Hadoop Fs implementation classes. + * + * @param cfg the configuration to set parameters into. + */ + private static void setImplClasses(Configuration cfg) { + cfg.set("fs.igfs.impl", IgfsHadoopFileSystem.class.getName()); + + cfg.set("fs.AbstractFileSystem.igfs.impl", + org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem.class.getName()); + } + + /** + * Check path does not exist in a given FileSystem. + * + * @param fs FileSystem to check. + * @param path Path to check. + */ + private void assertPathDoesNotExist(final FileSystem fs, final Path path) { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fs.getFileStatus(path); + } + }, FileNotFoundException.class, null); + } + + /** + * Writes down the configuration to local disk and returns its path. + * + * @param cfg the configuration to write. + * @param pathFromIgniteHome path relatively to Ignite home. + * @return Full path of the written configuration. + */ + private static String writeConfiguration(Configuration cfg, String pathFromIgniteHome) throws IOException { + if (!pathFromIgniteHome.startsWith("/")) + pathFromIgniteHome = "/" + pathFromIgniteHome; + + final String path = U.getIgniteHome() + pathFromIgniteHome; + + delete(path); + + File file = new File(path); + + try (FileOutputStream fos = new FileOutputStream(file)) { + cfg.writeXml(fos); + } + + assertTrue(file.exists()); + return path; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60 * 1000; + } + + /** + * Makes URI. + * + * @param scheme the scheme + * @param authority the authority + * @return URI String + */ + private static String mkUri(String scheme, String authority) { + return scheme + "://" + authority + "/"; + } +}