# IGNITE-406: Applied patch from Ivan V..
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d7755046 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d7755046 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d7755046 Branch: refs/heads/ignite-45 Commit: d7755046610e2195d9d73ad63f0b9ced8420350b Parents: d8c0766 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Fri Mar 13 09:19:11 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Fri Mar 13 09:19:11 2015 +0300 ---------------------------------------------------------------------- .../ignite/igfs/IgfsEventsAbstractSelfTest.java | 3 +- .../processors/igfs/IgfsAbstractSelfTest.java | 229 ++++++++++++++++--- .../igfs/IgfsDualAbstractSelfTest.java | 71 +++--- .../igfs/IgfsExUniversalFileSystemAdapter.java | 87 +++++++ .../igfs/UniversalFileSystemAdapter.java | 79 +++++++ .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 8 + .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 9 +- .../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 14 +- .../processors/hadoop/igfs/HadoopIgfsUtils.java | 7 +- .../ignite/igfs/Hadoop1DualAbstractTest.java | 105 +++++++++ .../igfs/Hadoop1OverIgfsDualAsyncTest.java | 30 +++ .../igfs/Hadoop1OverIgfsDualSyncTest.java | 30 +++ ...oopFileSystemUniversalFileSystemAdapter.java | 113 +++++++++ ...oopSecondaryFileSystemConfigurationTest.java | 12 +- .../testsuites/IgniteHadoopTestSuite.java | 4 +- 15 files changed, 716 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java index 2d9d269..5ae18ef 100644 --- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java @@ -170,7 +170,8 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest } // Clean up file system. - igfs.format(); + if (igfs != null) + igfs.format(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index 016807e..e69385a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -104,13 +104,16 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { protected static final IgfsPath FILE_NEW = new IgfsPath(SUBDIR_NEW, "fileNew"); /** Default data chunk (128 bytes). */ - protected static byte[] chunk; + protected static final byte[] chunk = createChunk(128); /** Primary IGFS. */ protected static IgfsImpl igfs; - /** Secondary IGFS. */ - protected static IgfsImpl igfsSecondary; + /** Secondary IGFS */ + protected static IgfsSecondaryFileSystem igfsSecondaryFileSystem; + + /** Secondary file system lower layer "backdoor" wrapped in UniversalFileSystemAdapter: */ + protected static UniversalFileSystemAdapter igfsSecondary; /** IGFS mode. */ protected final IgfsMode mode; @@ -151,22 +154,35 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { dual = mode != PRIMARY; } - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - chunk = new byte[128]; + private static byte[] createChunk(int length) { + byte[] chunk = new byte[length]; for (int i = 0; i < chunk.length; i++) chunk[i] = (byte)i; - Ignite igniteSecondary = startGridWithIgfs("ignite-secondary", "igfs-secondary", PRIMARY, null, SECONDARY_REST_CFG); + return chunk; + } - igfsSecondary = (IgfsImpl) igniteSecondary.fileSystem("igfs-secondary"); + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + igfsSecondaryFileSystem = createSecondaryFileSystemStack(); - Ignite ignite = startGridWithIgfs("ignite", "igfs", mode, igfsSecondary.asSecondary(), PRIMARY_REST_CFG); + Ignite ignite = startGridWithIgfs("ignite", "igfs", mode, igfsSecondaryFileSystem, PRIMARY_REST_CFG); igfs = (IgfsImpl) ignite.fileSystem("igfs"); } + protected IgfsSecondaryFileSystem createSecondaryFileSystemStack() throws Exception { + Ignite igniteSecondary = startGridWithIgfs("ignite-secondary", "igfs-secondary", PRIMARY, null, + SECONDARY_REST_CFG); + + IgfsEx secondaryIgfsImpl = (IgfsEx) igniteSecondary.fileSystem("igfs-secondary"); + + igfsSecondary = new IgfsExUniversalFileSystemAdapter(secondaryIgfsImpl); + + return secondaryIgfsImpl.asSecondary(); + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { clear(igfs, igfsSecondary); @@ -640,7 +656,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { checkExist(igfs, igfsSecondary, SUBSUBDIR); if (dual) - assertEquals(props, igfsSecondary.info(SUBSUBDIR).properties()); + // Check only permissions because user and group will always be present in Hadoop Fs. + assertEquals(props.get(PROP_PERMISSION), igfsSecondary.properties(SUBSUBDIR.toString()).get(PROP_PERMISSION)); // We check only permission because IGFS client adds username and group name explicitly. assertEquals(props.get(PROP_PERMISSION), igfs.info(SUBSUBDIR).properties().get(PROP_PERMISSION)); @@ -659,7 +676,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { checkExist(igfs, igfsSecondary, DIR); if (dual) - assertEquals(props, igfsSecondary.info(DIR).properties()); + // check permission only since Hadoop Fs will always have user and group: + assertEquals(props.get(PROP_PERMISSION), igfsSecondary.properties(DIR.toString()).get(PROP_PERMISSION)); // We check only permission because IGFS client adds username and group name explicitly. assertEquals(props.get(PROP_PERMISSION), igfs.info(DIR).properties().get(PROP_PERMISSION)); @@ -698,15 +716,16 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { */ public void testDeleteDirectoryNotEmpty() throws Exception { create(igfs, paths(DIR, SUBDIR, SUBSUBDIR), paths(FILE)); + checkExist(igfs, igfsSecondary, SUBDIR, SUBSUBDIR, FILE); - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - igfs.delete(SUBDIR, false); + try { + boolean ok = igfs.delete(SUBDIR, false); - return null; - } - }, IgfsDirectoryNotEmptyException.class, "Failed to remove directory (directory is not empty and " + - "recursive flag is not set)"); + assertFalse(ok); + } catch (IgfsDirectoryNotEmptyException idnee) { + // ok, expected + U.debug("Expected: " + idnee); + } checkExist(igfs, igfsSecondary, SUBDIR, SUBSUBDIR, FILE); } @@ -724,7 +743,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { igfs.update(FILE, props); if (dual) - assertEquals(props, igfsSecondary.info(FILE).properties()); + assertEquals(props, igfsSecondary.properties(FILE.toString())); assertEquals(props, igfs.info(FILE).properties()); } @@ -742,7 +761,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { igfs.update(DIR, props); if (dual) - assertEquals(props, igfsSecondary.info(DIR).properties()); + assertEquals(props, igfsSecondary.properties(DIR.toString())); assertEquals(props, igfs.info(DIR).properties()); } @@ -859,7 +878,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @throws Exception If failed. */ public void testOpenDoesNotExist() throws Exception { - igfsSecondary.delete(FILE, false); + igfsSecondary.delete(FILE.toString(), false); GridTestUtils.assertThrows(log(), new Callable<Object>() { @Override public Object call() throws Exception { @@ -883,7 +902,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @throws Exception If failed. */ public void testCreate() throws Exception { - create(igfs.asSecondary(), paths(DIR, SUBDIR), null); + create(igfs, paths(DIR, SUBDIR), null); createFile(igfs.asSecondary(), FILE, true, chunk); @@ -2112,12 +2131,15 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { for (int i = 0; i < lvlCnt; i++) { int lvl = i + 1; - IgfsImpl targetIgfs = dual ? lvl <= primaryLvlCnt ? igfs : igfsSecondary : igfs; + boolean targetToPrimary = !dual || lvl <= primaryLvlCnt; IgfsPath[] dirs = dirPaths.get(lvl).toArray(new IgfsPath[dirPaths.get(lvl).size()]); IgfsPath[] files = filePaths.get(lvl).toArray(new IgfsPath[filePaths.get(lvl).size()]); - create(targetIgfs, dirs, files); + if (targetToPrimary) + create(igfs, dirs, files); + else + create(igfsSecondary, dirs, files); } // Start all threads and wait for them to finish. @@ -2163,6 +2185,20 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } } + public void create(UniversalFileSystemAdapter uni, @Nullable IgfsPath[] dirs, @Nullable IgfsPath[] files) throws Exception { + if (dirs != null) { + for (IgfsPath dir : dirs) + uni.mkdirs(dir.toString()); + } + + if (files != null) { + for (IgfsPath file : files) + try (OutputStream os = uni.openOutputStream(file.toString(), false)) { + // noop + } + } + } + /** * Create the file in the given IGFS and write provided data chunks to it. * @@ -2191,6 +2227,32 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { /** * Create the file in the given IGFS and write provided data chunks to it. * + * @param file File. + * @param overwrite Overwrite flag. + * @param chunks Data chunks. + * @throws IOException In case of IO exception. + */ + protected static void createFile(UniversalFileSystemAdapter uni, IgfsPath file, boolean overwrite, @Nullable byte[]... chunks) + throws IOException { + OutputStream os = null; + + try { + os = uni.openOutputStream(file.toString(), false); + + writeFileChunks(os, chunks); + } + finally { + U.closeQuiet(os); + + IgfsEx igfsEx = uni.getAdapter(IgfsEx.class); + if (igfsEx != null) + awaitFileClose(igfsEx.asSecondary(), file); + } + } + + /** + * Create the file in the given IGFS and write provided data chunks to it. + * * @param igfs IGFS. * @param file File. * @param overwrite Overwrite flag. @@ -2275,7 +2337,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @param paths Paths. * @throws Exception If failed. */ - protected void checkExist(IgfsImpl igfs, IgfsImpl igfsSecondary, IgfsPath... paths) throws Exception { + protected void checkExist(IgfsImpl igfs, UniversalFileSystemAdapter igfsSecondary, IgfsPath... paths) throws Exception { checkExist(igfs, paths); if (dual) @@ -2298,6 +2360,28 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } /** + * Ensure that the given paths exist in the given IGFS. + * + * @param uni filesystem. + * @param paths Paths. + * @throws IgniteCheckedException If failed. + */ + protected void checkExist(UniversalFileSystemAdapter uni, IgfsPath... paths) throws IgniteCheckedException { + IgfsEx ex = uni.getAdapter(IgfsEx.class); + for (IgfsPath path : paths) { + if (ex != null) + assert ex.context().meta().fileId(path) != null : "Path doesn't exist [igfs=" + ex.name() + + ", path=" + path + ']'; + + try { + assert uni.exists(path.toString()) : "Path doesn't exist [igfs=" + uni.name() + ", path=" + path + ']'; + } catch (IOException ioe) { + throw new IgniteCheckedException(ioe); + } + } + } + + /** * Ensure that the given paths don't exist in the given IGFSs. * * @param igfs First IGFS. @@ -2305,7 +2389,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @param paths Paths. * @throws Exception If failed. */ - protected void checkNotExist(IgfsImpl igfs, IgfsImpl igfsSecondary, IgfsPath... paths) + protected void checkNotExist(IgfsImpl igfs, UniversalFileSystemAdapter igfsSecondary, IgfsPath... paths) throws Exception { checkNotExist(igfs, paths); @@ -2329,6 +2413,24 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } /** + * Ensure that the given paths don't exist in the given IGFS. + * + * @param uni secondary FS. + * @param paths Paths. + * @throws Exception If failed. + */ + protected void checkNotExist(UniversalFileSystemAdapter uni, IgfsPath... paths) throws Exception { + IgfsEx ex = uni.getAdapter(IgfsEx.class); + for (IgfsPath path : paths) { + if (ex != null) + assert ex.context().meta().fileId(path) == null : "Path exists [igfs=" + ex.name() + ", path=" + + path + ']'; + + assert !uni.exists(path.toString()) : "Path exists [igfs=" + uni.name() + ", path=" + path + ']'; + } + } + + /** * Ensure that the given file exists in the given IGFSs and that it has exactly the same content as provided in the * "data" parameter. * @@ -2338,14 +2440,14 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @param chunks Expected data. * @throws Exception If failed. */ - protected void checkFile(IgfsImpl igfs, IgfsImpl igfsSecondary, IgfsPath file, + protected void checkFile(IgfsImpl igfs, UniversalFileSystemAdapter igfsSecondary, IgfsPath file, @Nullable byte[]... chunks) throws Exception { checkExist(igfs, file); checkFileContent(igfs, file, chunks); if (dual) { checkExist(igfsSecondary, file); - checkFileContent(igfsSecondary, file, chunks); + checkFileContent(igfsSecondary, file.toString(), chunks); } } @@ -2388,6 +2490,46 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { } /** + * Ensure that the given file has exactly the same content as provided in the "data" parameter. + * + * @param uni FS. + * @param path File. + * @param chunks Expected data. + * @throws IOException In case of IO exception. + * @throws IgniteCheckedException In case of Grid exception. + */ + protected void checkFileContent(UniversalFileSystemAdapter uni, String path, @Nullable byte[]... chunks) + throws IOException, IgniteCheckedException { + if (chunks != null && chunks.length > 0) { + InputStream is = null; + + try { + is = uni.openInputStream(path); + + int chunkIdx = 0; + + int read; + for (byte[] chunk: chunks) { + byte[] buf = new byte[chunk.length]; + + read = is.read(buf); + + assert read == chunk.length : "Chunk #" + chunkIdx + " was not read fully."; + assert Arrays.equals(chunk, buf) : "Bad chunk [igfs=" + uni.name() + ", chunkIdx=" + chunkIdx + + ", expected=" + Arrays.toString(chunk) + ", actual=" + Arrays.toString(buf) + ']'; + + chunkIdx++; + } + + is.close(); + } + finally { + U.closeQuiet(is); + } + } + } + + /** * Create map with properties. * * @param username User name. @@ -2428,7 +2570,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @param igfsSecondary Second IGFS. * @throws Exception If failed. */ - protected void clear(IgniteFileSystem igfs, IgniteFileSystem igfsSecondary) throws Exception { + protected void clear(IgniteFileSystem igfs, UniversalFileSystemAdapter igfsSecondary) throws Exception { clear(igfs); if (dual) @@ -2459,4 +2601,33 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { // Clear igfs. igfs.format(); } + + /** + * Clear particular {@link UniversalFileSystemAdapter}. + * + * @param uni IGFS. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public static void clear(UniversalFileSystemAdapter uni) throws Exception { + IgfsEx igfsEx = uni.getAdapter(IgfsEx.class); + + if (igfsEx != null) { + Field workerMapFld = IgfsImpl.class.getDeclaredField("workerMap"); + + workerMapFld.setAccessible(true); + + // Wait for all workers to finish. + Map<IgfsPath, IgfsFileWorker> workerMap = (Map<IgfsPath, IgfsFileWorker>)workerMapFld.get(igfsEx); + + for (Map.Entry<IgfsPath, IgfsFileWorker> entry : workerMap.entrySet()) { + entry.getValue().cancel(); + + U.join(entry.getValue()); + } + } + + // Clear the filesystem: + uni.format(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java index 1df79cc..6ad09e9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java @@ -962,7 +962,8 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { checkExist(igfs, SUBDIR); checkExist(igfs, igfsSecondary, SUBSUBDIR); - assertEquals(props, igfsSecondary.info(SUBSUBDIR).properties()); + // Check only permissions because user and group will always be present in Hadoop secondary filesystem. + assertEquals(props.get(PROP_PERMISSION), igfsSecondary.properties(SUBSUBDIR.toString()).get(PROP_PERMISSION)); // We check only permission because IGFS client adds username and group name explicitly. assertEquals(props.get(PROP_PERMISSION), igfs.info(SUBSUBDIR).properties().get(PROP_PERMISSION)); @@ -986,7 +987,8 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { checkExist(igfs, SUBDIR); checkExist(igfs, igfsSecondary, SUBSUBDIR); - assertEquals(props, igfsSecondary.info(SUBSUBDIR).properties()); + // Check only permission because in case of Hadoop secondary Fs user and group will always be present: + assertEquals(props.get(PROP_PERMISSION), igfsSecondary.properties(SUBSUBDIR.toString()).get(PROP_PERMISSION)); // We check only permission because IGFS client adds username and group name explicitly. assertEquals(props.get(PROP_PERMISSION), igfs.info(SUBSUBDIR).properties().get(PROP_PERMISSION)); @@ -1049,7 +1051,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfs, paths(DIR), null); // Set different properties to the sub-directory. - igfsSecondary.update(SUBDIR, propsSubDir); + igfsSecondaryFileSystem.update(SUBDIR, propsSubDir); igfs.update(FILE, propsFile); @@ -1057,10 +1059,10 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { checkExist(igfs, SUBDIR, FILE); // Ensure properties propagation. - assertEquals(propsSubDir, igfsSecondary.info(SUBDIR).properties()); + assertEquals(propsSubDir, igfsSecondary.properties(SUBDIR.toString())); assertEquals(propsSubDir, igfs.info(SUBDIR).properties()); - assertEquals(propsFile, igfsSecondary.info(FILE).properties()); + assertEquals(propsFile, igfsSecondary.properties(FILE.toString())); assertEquals(propsFile, igfs.info(FILE).properties()); } @@ -1077,7 +1079,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfs, null, null); // Set different properties to the sub-directory. - igfsSecondary.update(SUBDIR, propsSubDir); + igfsSecondaryFileSystem.update(SUBDIR, propsSubDir); igfs.update(FILE, propsFile); @@ -1085,10 +1087,10 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { checkExist(igfs, DIR, SUBDIR, FILE); // Ensure properties propagation. - assertEquals(propsSubDir, igfsSecondary.info(SUBDIR).properties()); + assertEquals(propsSubDir, igfsSecondary.properties(SUBDIR.toString())); assertEquals(propsSubDir, igfs.info(SUBDIR).properties()); - assertEquals(propsFile, igfsSecondary.info(FILE).properties()); + assertEquals(propsFile, igfsSecondary.properties(FILE.toString())); assertEquals(propsFile, igfs.info(FILE).properties()); } @@ -1107,7 +1109,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { checkExist(igfs, DIR); - assertEquals(props, igfsSecondary.info(DIR).properties()); + assertEquals(props, igfsSecondary.properties(DIR.toString())); assertEquals(props, igfs.info(DIR).properties()); } @@ -1120,7 +1122,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfsSecondary, paths(DIR, SUBDIR), null); create(igfs, null, null); - createFile(igfsSecondary.asSecondary(), FILE, true, chunk); + createFile(igfsSecondary, FILE, true, chunk); checkFileContent(igfs, FILE, chunk); } @@ -1136,19 +1138,17 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { // Write enough data to the secondary file system. final int blockSize = IGFS_BLOCK_SIZE; - IgfsOutputStream out = igfsSecondary.append(FILE, false); - int totalWritten = 0; + try (OutputStream out = igfsSecondary.openOutputStream(FILE.toString(), false)) { - while (totalWritten < blockSize * 2 + chunk.length) { - out.write(chunk); + while (totalWritten < blockSize * 2 + chunk.length) { + out.write(chunk); - totalWritten += chunk.length; + totalWritten += chunk.length; + } } - out.close(); - - awaitFileClose(igfsSecondary.asSecondary(), FILE); + awaitFileClose(igfsSecondaryFileSystem, FILE); // Read the first block. int totalRead = 0; @@ -1179,7 +1179,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { U.sleep(300); // Remove the file from the secondary file system. - igfsSecondary.delete(FILE, false); + igfsSecondary.delete(FILE.toString(), false); // Let's wait for file will be deleted. U.sleep(300); @@ -1215,19 +1215,16 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { // Write enough data to the secondary file system. final int blockSize = igfs.info(FILE).blockSize(); - IgfsOutputStream out = igfsSecondary.append(FILE, false); - int totalWritten = 0; + try (OutputStream out = igfsSecondary.openOutputStream(FILE.toString(), false)) { + while (totalWritten < blockSize * 2 + chunk.length) { + out.write(chunk); - while (totalWritten < blockSize * 2 + chunk.length) { - out.write(chunk); - - totalWritten += chunk.length; + totalWritten += chunk.length; + } } - out.close(); - - awaitFileClose(igfsSecondary.asSecondary(), FILE); + awaitFileClose(igfsSecondaryFileSystem, FILE); // Read the first two blocks. int totalRead = 0; @@ -1260,7 +1257,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { } // Remove the file from the secondary file system. - igfsSecondary.delete(FILE, false); + igfsSecondary.delete(FILE.toString(), false); // Let's wait for file will be deleted. U.sleep(300); @@ -1290,7 +1287,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfsSecondary, paths(DIR, SUBDIR), null); create(igfs, paths(DIR), null); - igfsSecondary.update(SUBDIR, props); + igfsSecondaryFileSystem.update(SUBDIR, props); createFile(igfs.asSecondary(), FILE, true, chunk); @@ -1314,8 +1311,8 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfsSecondary, paths(DIR, SUBDIR), null); create(igfs, null, null); - igfsSecondary.update(DIR, propsDir); - igfsSecondary.update(SUBDIR, propsSubDir); + igfsSecondaryFileSystem.update(DIR, propsDir); + igfsSecondaryFileSystem.update(SUBDIR, propsSubDir); createFile(igfs.asSecondary(), FILE, true, chunk); @@ -1338,9 +1335,9 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfsSecondary, paths(DIR, SUBDIR), null); create(igfs, paths(DIR), null); - igfsSecondary.update(SUBDIR, props); + igfsSecondaryFileSystem.update(SUBDIR, props); - createFile(igfsSecondary, FILE, true, BLOCK_SIZE, chunk); + createFile(igfsSecondary, FILE, true, /*BLOCK_SIZE,*/ chunk); appendFile(igfs, FILE, chunk); @@ -1364,10 +1361,10 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { create(igfsSecondary, paths(DIR, SUBDIR), null); create(igfs, null, null); - igfsSecondary.update(DIR, propsDir); - igfsSecondary.update(SUBDIR, propsSubDir); + igfsSecondaryFileSystem.update(DIR, propsDir); + igfsSecondaryFileSystem.update(SUBDIR, propsSubDir); - createFile(igfsSecondary, FILE, true, BLOCK_SIZE, chunk); + createFile(igfsSecondary, FILE, true, /*BLOCK_SIZE,*/ chunk); appendFile(igfs, FILE, chunk); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java new file mode 100644 index 0000000..8b40b0b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java @@ -0,0 +1,87 @@ +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.igfs.*; + +import java.io.*; +import java.util.*; + +/** + * Universal adapter over {@link IgfsEx} filesystem. + */ +public class IgfsExUniversalFileSystemAdapter implements UniversalFileSystemAdapter { + + /** The wrapped igfs. */ + private final IgfsEx igfsEx; + + /** + * Constructor. + * @param igfsEx the igfs to be wrapped. + */ + public IgfsExUniversalFileSystemAdapter(IgfsEx igfsEx) { + this.igfsEx = igfsEx; + } + + /** {@inheritDoc} */ + @Override public String name() { + return igfsEx.name(); + } + + /** {@inheritDoc} */ + @Override public boolean exists(String path) { + return igfsEx.exists(new IgfsPath(path)); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(String path) throws IOException { + igfsEx.mkdirs(new IgfsPath(path)); + } + + /** {@inheritDoc} */ + @Override public void format() throws IOException { + igfsEx.format(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties(String path) { + return igfsEx.info(new IgfsPath(path)).properties(); + } + + /** {@inheritDoc} */ + @Override public boolean delete(String path, boolean recursive) throws IOException { + IgfsPath igfsPath = new IgfsPath(path); + + boolean del = igfsEx.delete(igfsPath, recursive); + + return del; + } + + /** {@inheritDoc} */ + @Override public InputStream openInputStream(String path) throws IOException { + IgfsPath igfsPath = new IgfsPath(path); + + IgfsInputStreamAdapter adapter = igfsEx.open(igfsPath); + + return adapter; + } + + /** {@inheritDoc} */ + @Override public OutputStream openOutputStream(String path, boolean append) throws IOException { + IgfsPath igfsPath = new IgfsPath(path); + + final IgfsOutputStream igfsOutputStream; + if (append) + igfsOutputStream = igfsEx.append(igfsPath, true/*create*/); + else + igfsOutputStream = igfsEx.create(igfsPath, true/*overwrite*/); + + return igfsOutputStream; + } + + /** {@inheritDoc} */ + @Override public <T> T getAdapter(Class<T> clazz) { + if (clazz == IgfsEx.class) + return (T)igfsEx; + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java new file mode 100644 index 0000000..17a4ea9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java @@ -0,0 +1,79 @@ +package org.apache.ignite.internal.processors.igfs; + +import java.io.*; +import java.util.*; + +/** + * Universal interface to an underlying filesystem. + * Typically used for secondary filesystem. + * To be used solely in tests. + */ +public interface UniversalFileSystemAdapter { + + /** + * Gets name of the FS. + * @return name of this file system. + */ + String name(); + + /** + * Answers if a file denoted by path exists. + * @param path path of the file to check. + * @return if the file exists. + * @throws IOException in case of failure. + */ + boolean exists(String path) throws IOException; + + /** + * Deletes a file or directory. + * @param path the path to delete. + * @param recursive instructs to delete a directory recursively. + * @return true on success, false otherwise. + * @throws IOException On failure. + */ + boolean delete(String path, boolean recursive) throws IOException; + + /** + * Makes directories, creating missing parent directories as needed. + * @param path the directory to create. + * @throws IOException On failure. + */ + void mkdirs(String path) throws IOException; + + /** + * Clears (formats) entire the filesystem. + * All the data in the filesystem are DESTROYED. + * @throws IOException + */ + void format() throws IOException; + + /** + * Gets properties (such as owner, group, and permissions) of a file. + * @param path the path to the file to get properties of. + * @return the properties. + */ + Map<String,String> properties(String path) throws IOException; + + /** + * Opens input stream to read file contents. + * @param path the path to the file. + */ + InputStream openInputStream(String path) throws IOException; + + /** + * Opens output stream to write file contents. + * @param path the path to the file to be written. + * @param append if to append to the end of existing data. + * @return the OutputStream to write into. + * @throws IOException On failure. + */ + OutputStream openOutputStream(String path, boolean append) throws IOException; + + /** + * Gets an entity of the given type (class) associated with this universal adapter. + * @param clazz The class representing the type we wish to adapt to. + * @param <T> The type we need to adapt to. + * @return the adapter object of the given type. + */ + <T> T getAdapter(Class<T> clazz); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index 11003a5..6d0c386 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -419,4 +419,12 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys throw new IgniteCheckedException(e); } } + + /** + * Gets the underlying {@link FileSystem}. + * @return the underlying Hadoop {@link FileSystem}. + */ + public FileSystem fileSystem() { + return fileSys; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 7c2f3bd..6bdcd8e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -524,7 +524,7 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @SuppressWarnings("deprecation") - @Override public FSDataOutputStream create(Path f, FsPermission perm, boolean overwrite, int bufSize, + @Override public FSDataOutputStream create(Path f, final FsPermission perm, boolean overwrite, int bufSize, short replication, long blockSize, Progressable progress) throws IOException { A.notNull(f, "f"); @@ -561,10 +561,13 @@ public class IgniteHadoopFileSystem extends FileSystem { return os; } else { + Map<String,String> propMap = permission(perm); + + propMap.put(PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); + // Create stream and close it in the 'finally' section if any sequential operation failed. HadoopIgfsStreamDelegate stream = rmtClient.create(path, overwrite, colocateFileWrites, - replication, blockSize, F.asMap(PROP_PERMISSION, toString(perm), - PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites))); + replication, blockSize, propMap); assert stream != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java index b3b981b..2e7254a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java @@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.hadoop.igfs; import org.apache.commons.logging.*; import org.apache.ignite.*; +import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.igfs.common.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.ipc.*; import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -238,7 +237,7 @@ public class HadoopIgfsIpcIo implements HadoopIgfsIo { try { endpoint = IpcEndpointFactory.connectEndpoint( - endpointAddr, new GridLoggerProxy(new HadoopIgfsJclLogger(log), null, null, "")); + endpointAddr, new GridLoggerProxy(new HadoopIgfsJclLogger(log), null, null, "")); out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); @@ -558,7 +557,7 @@ public class HadoopIgfsIpcIo implements HadoopIgfsIo { fut.onDone(res); } - catch (IgniteCheckedException e) { + catch (IgfsException | IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to apply response closure (will fail request future): " + e.getMessage()); @@ -567,6 +566,11 @@ public class HadoopIgfsIpcIo implements HadoopIgfsIo { err = e; } + catch (Throwable t) { + fut.onDone(t); + + throw t; + } } } } @@ -580,7 +584,7 @@ public class HadoopIgfsIpcIo implements HadoopIgfsIo { err = new HadoopIgfsCommunicationException(e); } - catch (IgniteCheckedException e) { + catch (Throwable e) { if (!stopping) log.error("Failed to obtain endpoint input stream (connection will be closed)", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java index 0e3d127..503b42e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java @@ -117,8 +117,11 @@ public class HadoopIgfsUtils { return new PathIsNotEmptyDirectoryException(path); else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class)) return new PathExistsException(path); - else - return new IOException(e); + else { + String msg = e.getMessage(); + + return msg == null ? new IOException(e) : new IOException(msg, e); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java new file mode 100644 index 0000000..6837735 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.ignite.hadoop.fs.*; +import org.apache.ignite.igfs.secondary.*; +import org.apache.ignite.internal.processors.igfs.*; + +import java.io.*; + +import static org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest.*; +import static org.apache.ignite.igfs.IgfsMode.*; + +/** + * Abstract test for Hadoop 1.0 file system stack. + */ +public abstract class Hadoop1DualAbstractTest extends IgfsDualAbstractSelfTest { + /** Secondary grid name */ + private static final String GRID_NAME = "grid_secondary"; + + /** Secondary file system name */ + private static final String IGFS_NAME = "igfs_secondary"; + + /** Secondary file system REST endpoint port */ + private static final int PORT = 11500; + + /** Secondary file system REST endpoint configuration map. */ + private static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration() {{ + setType(IgfsIpcEndpointType.TCP); + setPort(PORT); + }}; + + /** Secondary file system authority. */ + private static final String SECONDARY_AUTHORITY = IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + PORT; + + /** Secondary Fs configuration full path. */ + protected String secondaryConfFullPath; + + /** Secondary Fs URI. */ + protected String secondaryUri; + + /** Constructor. */ + public Hadoop1DualAbstractTest(IgfsMode mode) { + super(mode); + } + + /** + * Creates secondary filesystems. + * @return IgfsSecondaryFileSystem + * @throws Exception On failure. + */ + @Override protected IgfsSecondaryFileSystem createSecondaryFileSystemStack() throws Exception { + startUnderlying(); + + prepareConfiguration(); + + IgniteHadoopIgfsSecondaryFileSystem second = + new IgniteHadoopIgfsSecondaryFileSystem(secondaryUri, secondaryConfFullPath); + + FileSystem fileSystem = second.fileSystem(); + + igfsSecondary = new HadoopFileSystemUniversalFileSystemAdapter(fileSystem); + + return second; + } + + /** + * Starts underlying Ignite process. + * @throws IOException On failure. + */ + protected void startUnderlying() throws Exception { + startGridWithIgfs(GRID_NAME, IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); + } + + /** + * Prepares Fs configuration. + * @throws IOException On failure. + */ + protected void prepareConfiguration() throws IOException { + Configuration secondaryConf = configuration(IGFS_SCHEME, SECONDARY_AUTHORITY, true, true); + + secondaryConf.setInt("fs.igfs.block.size", 1024); + + secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH); + + secondaryUri = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java new file mode 100644 index 0000000..2b908ca --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +/** + * DUAL_ASYNC mode test. + */ +public class Hadoop1OverIgfsDualAsyncTest extends Hadoop1DualAbstractTest { + /** + * Constructor. + */ + public Hadoop1OverIgfsDualAsyncTest() { + super(IgfsMode.DUAL_ASYNC); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java new file mode 100644 index 0000000..0a585fa --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +/** + * DUAL_SYNC mode. + */ +public class Hadoop1OverIgfsDualSyncTest extends Hadoop1DualAbstractTest { + /** + * Constructor. + */ + public Hadoop1OverIgfsDualSyncTest() { + super(IgfsMode.DUAL_SYNC); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java new file mode 100644 index 0000000..fb0ba0f --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.*; +import org.apache.ignite.internal.processors.igfs.*; + +import java.io.*; +import java.util.*; + +/** + * Universal adapter wrapping {@link org.apache.hadoop.fs.FileSystem} instance. + */ +public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFileSystemAdapter { + + /** The wrapped filesystem. */ + private final FileSystem fileSys; + + /** + * Constructor. + * @param fs the filesystem to be wrapped. + */ + public HadoopFileSystemUniversalFileSystemAdapter(FileSystem fs) { + this.fileSys = fs; + } + + /** {@inheritDoc} */ + @Override public String name() { + return fileSys.getUri().toString(); + } + + /** {@inheritDoc} */ + @Override public boolean exists(String path) throws IOException { + return fileSys.exists(new Path(path)); + } + + /** {@inheritDoc} */ + @Override public boolean delete(String path, boolean recursive) throws IOException { + boolean ok = fileSys.delete(new Path(path), recursive); + return ok; + } + + /** {@inheritDoc} */ + @Override public void mkdirs(String path) throws IOException { + boolean ok = fileSys.mkdirs(new Path(path)); + if (!ok) + throw new IOException("Failed to mkdirs: " + path); + } + + /** {@inheritDoc} */ + @Override public void format() throws IOException { + fileSys.delete(new Path("/"), true); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties(String path) throws IOException { + Path p = new Path(path); + + FileStatus status = fileSys.getFileStatus(p); + + Map<String,String> m = new HashMap<>(3); // max size == 4 + + m.put(IgfsEx.PROP_USER_NAME, status.getOwner()); + + m.put(IgfsEx.PROP_GROUP_NAME, status.getGroup()); + + FsPermission perm = status.getPermission(); + + m.put(IgfsEx.PROP_PERMISSION, "0" + perm.getUserAction().ordinal() + perm.getGroupAction().ordinal() + + perm.getOtherAction().ordinal()); + + return m; + } + + /** {@inheritDoc} */ + @Override public InputStream openInputStream(String path) throws IOException { + return fileSys.open(new Path(path)); + } + + /** {@inheritDoc} */ + @Override public OutputStream openOutputStream(String path, boolean append) throws IOException { + Path p = new Path(path); + + if (append) + return fileSys.append(p); + else + return fileSys.create(p, true/*overwrite*/); + } + + /** {@inheritDoc} */ + @Override public <T> T getAdapter(Class<T> clazz) { + if (clazz == FileSystem.class) + return (T)fileSys; + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index dbe2449..2679e03 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -53,7 +53,7 @@ import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEnd */ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstractTest { /** IGFS scheme */ - private static final String IGFS_SCHEME = "igfs"; + static final String IGFS_SCHEME = "igfs"; /** Primary file system authority. */ private static final String PRIMARY_AUTHORITY = "igfs:grid0@"; @@ -65,7 +65,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra private static final String SECONDARY_AUTHORITY = "igfs_secondary:grid_secondary@127.0.0.1:11500"; /** Autogenerated secondary file system configuration path. */ - private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; + static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; /** Secondary endpoint configuration. */ protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG; @@ -475,7 +475,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra * @param skipLocShmem Whether to skip local shmem mode. * @return Configuration. */ - private static Configuration configuration(String scheme, String authority, boolean skipEmbed, boolean skipLocShmem) { + static Configuration configuration(String scheme, String authority, boolean skipEmbed, boolean skipLocShmem) { final Configuration cfg = new Configuration(); if (scheme != null && authority != null) @@ -499,7 +499,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra * * @param cfg the configuration to set parameters into. */ - private static void setImplClasses(Configuration cfg) { + static void setImplClasses(Configuration cfg) { cfg.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); cfg.set("fs.AbstractFileSystem.igfs.impl", @@ -528,7 +528,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra * @param pathFromIgniteHome path relatively to Ignite home. * @return Full path of the written configuration. */ - private static String writeConfiguration(Configuration cfg, String pathFromIgniteHome) throws IOException { + static String writeConfiguration(Configuration cfg, String pathFromIgniteHome) throws IOException { if (!pathFromIgniteHome.startsWith("/")) pathFromIgniteHome = "/" + pathFromIgniteHome; @@ -558,7 +558,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra * @param authority the authority * @return URI String */ - private static String mkUri(String scheme, String authority) { + static String mkUri(String scheme, String authority) { return scheme + "://" + authority + "/"; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7755046/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 4d2a4b6..985dbb2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -26,8 +26,6 @@ import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -122,6 +120,8 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopSecondaryFileSystemConfigurationTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName()))); return suite; }