# IGNITE-386: Reworked API in Hadoop module.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7d46deb2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7d46deb2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7d46deb2 Branch: refs/heads/ignite-386 Commit: 7d46deb24cdd881c48c05b17fc1cb3a3562d54b9 Parents: e1f6636 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Mar 3 14:05:54 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Mar 3 14:05:55 2015 +0300 ---------------------------------------------------------------------- config/hadoop/default-config.xml | 2 +- docs/core-site.ignite.xml | 4 +- examples/config/filesystem/core-site.xml | 4 +- .../main/java/org/apache/ignite/IgniteFs.java | 4 +- .../ignite/configuration/IgfsConfiguration.java | 8 +- .../org/apache/ignite_new/IgniteFileSystem.java | 4 +- .../configuration/FileSystemConfiguration.java | 11 +- .../hadoop/core-site-loopback-secondary.xml | 4 +- .../test/config/hadoop/core-site-loopback.xml | 4 +- .../test/config/hadoop/core-site-secondary.xml | 4 +- .../core/src/test/config/hadoop/core-site.xml | 4 +- .../client/hadoop/GridHadoopClientProtocol.java | 334 --- .../GridHadoopClientProtocolProvider.java | 137 - .../counter/GridHadoopClientCounterGroup.java | 121 - .../counter/GridHadoopClientCounters.java | 217 -- .../apache/ignite/client/hadoop/package.html | 24 - .../fs/IgniteHadoopSecondaryFileSystem.java | 413 +++ .../org/apache/ignite/hadoop/fs/package.html | 24 + .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 1254 ++++++++++ .../org/apache/ignite/hadoop/fs/v1/package.html | 24 + .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 1008 ++++++++ .../org/apache/ignite/hadoop/fs/v2/package.html | 24 + .../IgniteHadoopClientProtocolProvider.java | 138 + .../apache/ignite/hadoop/mapreduce/package.html | 24 + .../java/org/apache/ignite/hadoop/package.html | 24 + .../hadoop/IgfsHadoopFileSystemWrapper.java | 412 --- .../igfs/hadoop/IgfsHadoopParameters.java | 94 - .../org/apache/ignite/igfs/hadoop/package.html | 24 - .../igfs/hadoop/v1/IgfsHadoopFileSystem.java | 1254 ---------- .../apache/ignite/igfs/hadoop/v1/package.html | 24 - .../igfs/hadoop/v2/IgfsHadoopFileSystem.java | 1008 -------- .../apache/ignite/igfs/hadoop/v2/package.html | 24 - .../java/org/apache/ignite/igfs/package.html | 24 - .../processors/hadoop/HadoopCounterGroup.java | 121 + .../processors/hadoop/HadoopCounters.java | 216 ++ .../hadoop/fs/GridHadoopFileSystemsUtils.java | 6 +- .../processors/hadoop/fs/HadoopParameters.java | 94 + .../hadoop/proto/HadoopClientProtocol.java | 332 +++ .../hadoop/v2/GridHadoopV2TaskContext.java | 2 +- .../fs/IgniteHadoopSecondaryFileSystem.java | 413 --- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 1254 ---------- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 1008 -------- .../mapreduce/IgniteHadoopCounterGroup.java | 121 - .../hadoop/mapreduce/IgniteHadoopCounters.java | 217 -- .../protocol/IgniteHadoopClientProtocol.java | 334 --- .../IgniteHadoopClientProtocolProvider.java | 137 - ...op.mapreduce.protocol.ClientProtocolProvider | 2 +- ...ridHadoopClientProtocolEmbeddedSelfTest.java | 34 - .../GridHadoopClientProtocolSelfTest.java | 633 ----- .../HadoopClientProtocolEmbeddedSelfTest.java | 34 + .../hadoop/HadoopClientProtocolSelfTest.java | 635 +++++ .../apache/ignite/igfs/IgfsEventsTestSuite.java | 6 +- .../IgfsHadoop20FileSystemAbstractSelfTest.java | 4 +- .../igfs/IgfsHadoopDualAbstractSelfTest.java | 6 +- .../IgfsHadoopFileSystemAbstractSelfTest.java | 2366 ------------------ .../IgfsHadoopFileSystemClientSelfTest.java | 199 -- .../IgfsHadoopFileSystemHandshakeSelfTest.java | 311 --- .../IgfsHadoopFileSystemIpcCacheSelfTest.java | 207 -- .../IgfsHadoopFileSystemLoggerSelfTest.java | 287 --- ...IgfsHadoopFileSystemLoggerStateSelfTest.java | 325 --- ...adoopFileSystemLoopbackAbstractSelfTest.java | 46 - ...SystemLoopbackEmbeddedDualAsyncSelfTest.java | 33 - ...eSystemLoopbackEmbeddedDualSyncSelfTest.java | 33 - ...leSystemLoopbackEmbeddedPrimarySelfTest.java | 33 - ...SystemLoopbackEmbeddedSecondarySelfTest.java | 34 - ...SystemLoopbackExternalDualAsyncSelfTest.java | 33 - ...eSystemLoopbackExternalDualSyncSelfTest.java | 33 - ...leSystemLoopbackExternalPrimarySelfTest.java | 33 - ...SystemLoopbackExternalSecondarySelfTest.java | 34 - ...fsHadoopFileSystemSecondaryModeSelfTest.java | 319 --- ...fsHadoopFileSystemShmemAbstractSelfTest.java | 88 - ...ileSystemShmemEmbeddedDualAsyncSelfTest.java | 33 - ...FileSystemShmemEmbeddedDualSyncSelfTest.java | 33 - ...pFileSystemShmemEmbeddedPrimarySelfTest.java | 33 - ...ileSystemShmemEmbeddedSecondarySelfTest.java | 33 - ...ileSystemShmemExternalDualAsyncSelfTest.java | 33 - ...FileSystemShmemExternalDualSyncSelfTest.java | 33 - ...pFileSystemShmemExternalPrimarySelfTest.java | 33 - ...ileSystemShmemExternalSecondarySelfTest.java | 33 - .../IgniteHadoopFileSystemAbstractSelfTest.java | 2366 ++++++++++++++++++ .../IgniteHadoopFileSystemClientSelfTest.java | 199 ++ ...IgniteHadoopFileSystemHandshakeSelfTest.java | 311 +++ .../IgniteHadoopFileSystemIpcCacheSelfTest.java | 207 ++ .../IgniteHadoopFileSystemLoggerSelfTest.java | 287 +++ ...niteHadoopFileSystemLoggerStateSelfTest.java | 325 +++ ...adoopFileSystemLoopbackAbstractSelfTest.java | 46 + ...SystemLoopbackEmbeddedDualAsyncSelfTest.java | 33 + ...eSystemLoopbackEmbeddedDualSyncSelfTest.java | 33 + ...leSystemLoopbackEmbeddedPrimarySelfTest.java | 33 + ...SystemLoopbackEmbeddedSecondarySelfTest.java | 34 + ...SystemLoopbackExternalDualAsyncSelfTest.java | 33 + ...eSystemLoopbackExternalDualSyncSelfTest.java | 33 + ...leSystemLoopbackExternalPrimarySelfTest.java | 33 + ...SystemLoopbackExternalSecondarySelfTest.java | 34 + ...teHadoopFileSystemSecondaryModeSelfTest.java | 319 +++ ...teHadoopFileSystemShmemAbstractSelfTest.java | 88 + ...ileSystemShmemEmbeddedDualAsyncSelfTest.java | 33 + ...FileSystemShmemEmbeddedDualSyncSelfTest.java | 33 + ...pFileSystemShmemEmbeddedPrimarySelfTest.java | 33 + ...ileSystemShmemEmbeddedSecondarySelfTest.java | 33 + ...ileSystemShmemExternalDualAsyncSelfTest.java | 33 + ...FileSystemShmemExternalDualSyncSelfTest.java | 33 + ...pFileSystemShmemExternalPrimarySelfTest.java | 33 + ...ileSystemShmemExternalSecondarySelfTest.java | 33 + .../hadoop/GridHadoopAbstractSelfTest.java | 6 +- .../processors/hadoop/GridHadoopStartup.java | 7 +- .../hadoop/GridHadoopTaskExecutionSelfTest.java | 8 +- .../testsuites/IgniteHadoopTestSuite.java | 30 +- .../IgniteIgfsLinuxAndMacOSTestSuite.java | 18 +- pom.xml | 4 +- 110 files changed, 9119 insertions(+), 12602 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/config/hadoop/default-config.xml ---------------------------------------------------------------------- diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml index a264749..8f5854f 100644 --- a/config/hadoop/default-config.xml +++ b/config/hadoop/default-config.xml @@ -135,7 +135,7 @@ <property name="defaultMode" value="PROXY"/> <property name="secondaryFileSystem"> - <bean class="org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystemWrapper"> + <bean class="org.apache.ignite.hadoop.fs.IgniteHadoopSecondaryFileSystem"> <constructor-arg name="uri" value="hdfs://1.2.3.4:9000"/> <constructor-arg name="cfgPath" value="/opt/hadoop-server/etc/hadoop/core-site.xml"/> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/docs/core-site.ignite.xml ---------------------------------------------------------------------- diff --git a/docs/core-site.ignite.xml b/docs/core-site.ignite.xml index ed11a79..1146576 100644 --- a/docs/core-site.ignite.xml +++ b/docs/core-site.ignite.xml @@ -48,7 +48,7 @@ --> <property> <name>fs.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value> </property> <!-- @@ -56,7 +56,7 @@ --> <property> <name>fs.AbstractFileSystem.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value> </property> <!-- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/examples/config/filesystem/core-site.xml ---------------------------------------------------------------------- diff --git a/examples/config/filesystem/core-site.xml b/examples/config/filesystem/core-site.xml index 7c6cfaa..b6f0291 100644 --- a/examples/config/filesystem/core-site.xml +++ b/examples/config/filesystem/core-site.xml @@ -31,12 +31,12 @@ <property> <!-- FS driver class for the 'igfs://' URIs. --> <name>fs.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value> </property> <property> <!-- FS driver class for the 'igfs://' URIs in Hadoop2.x --> <name>fs.AbstractFileSystem.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value> </property> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/core/src/main/java/org/apache/ignite/IgniteFs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java index dc11973..83fba8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java @@ -51,8 +51,8 @@ import java.util.*; * <h1 class="header">Integration With Hadoop</h1> * In addition to direct file system API, {@code IGFS} can be integrated with {@code Hadoop} by * plugging in as {@code Hadoop FileSystem}. Refer to - * {@code org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem} or - * {@code org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem} for more information. + * {@code org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem} or + * {@code org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem} for more information. * <p> * <b>NOTE:</b> integration with Hadoop is available only in {@code In-Memory Accelerator For Hadoop} edition. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/core/src/main/java/org/apache/ignite/configuration/IgfsConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgfsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgfsConfiguration.java index 308471d..2ad79fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgfsConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgfsConfiguration.java @@ -313,8 +313,8 @@ public class IgfsConfiguration { * Default is {@code 0} which means that pre-fetching will start right away. * <h1 class="header">Integration With Hadoop</h1> * This parameter can be also overridden for individual Hadoop MapReduce tasks by passing - * {@code org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH} - * configuration property directly to Hadoop MapReduce task. + * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} configuration property directly to Hadoop + * MapReduce task. * <p> * <b>NOTE:</b> Integration with Hadoop is available only in {@code In-Memory Accelerator For Hadoop} edition. * @@ -333,8 +333,8 @@ public class IgfsConfiguration { * Default is {@code 0} which means that pre-fetching will start right away. * <h1 class="header">Integration With Hadoop</h1> * This parameter can be also overridden for individual Hadoop MapReduce tasks by passing - * {@code org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH} - * configuration property directly to Hadoop MapReduce task. + * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} configuration property directly to Hadoop + * MapReduce task. * <p> * <b>NOTE:</b> Integration with Hadoop is available only in {@code In-Memory Accelerator For Hadoop} edition. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/core/src/main/java/org/apache/ignite_new/IgniteFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite_new/IgniteFileSystem.java b/modules/core/src/main/java/org/apache/ignite_new/IgniteFileSystem.java index beb5e0e..5042b93 100644 --- a/modules/core/src/main/java/org/apache/ignite_new/IgniteFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite_new/IgniteFileSystem.java @@ -53,8 +53,8 @@ import java.util.*; * <h1 class="header">Integration With Hadoop</h1> * In addition to direct file system API, {@code IGFS} can be integrated with {@code Hadoop} by * plugging in as {@code Hadoop FileSystem}. Refer to - * {@code org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem} or - * {@code org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem} for more information. + * {@code org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem} or + * {@code org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem} for more information. * <p> * <b>NOTE:</b> integration with Hadoop is available only in {@code In-Memory Accelerator For Hadoop} edition. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java index 9f46b00..ea838fa 100644 --- a/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java @@ -27,7 +27,8 @@ import java.util.concurrent.*; /** * {@code IGFS} configuration. More than one file system can be configured within grid. - * {@code IGFS} configuration is provided via {@link org.apache.ignite.configuration.IgniteConfiguration#getIgfsConfiguration()} + * {@code IGFS} configuration is provided via + * {@link org.apache.ignite.configuration.IgniteConfiguration#getIgfsConfiguration()} * method. * <p> * Refer to {@code config/hadoop/default-config.xml} or {@code config/hadoop/default-config-client.xml} @@ -314,8 +315,8 @@ public class FileSystemConfiguration { * Default is {@code 0} which means that pre-fetching will start right away. * <h1 class="header">Integration With Hadoop</h1> * This parameter can be also overridden for individual Hadoop MapReduce tasks by passing - * {@code org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH} - * configuration property directly to Hadoop MapReduce task. + * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} configuration property directly to Hadoop + * MapReduce task. * <p> * <b>NOTE:</b> Integration with Hadoop is available only in {@code In-Memory Accelerator For Hadoop} edition. * @@ -334,8 +335,8 @@ public class FileSystemConfiguration { * Default is {@code 0} which means that pre-fetching will start right away. * <h1 class="header">Integration With Hadoop</h1> * This parameter can be also overridden for individual Hadoop MapReduce tasks by passing - * {@code org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH} - * configuration property directly to Hadoop MapReduce task. + * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} configuration property directly to Hadoop + * MapReduce task. * <p> * <b>NOTE:</b> Integration with Hadoop is available only in {@code In-Memory Accelerator For Hadoop} edition. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml b/modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml index 2dc952a..43f9395 100644 --- a/modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml +++ b/modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml @@ -29,12 +29,12 @@ <property> <name>fs.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value> </property> <property> <name>fs.AbstractFileSystem.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value> </property> <property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/core/src/test/config/hadoop/core-site-loopback.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/hadoop/core-site-loopback.xml b/modules/core/src/test/config/hadoop/core-site-loopback.xml index 63902fa..e1d1320 100644 --- a/modules/core/src/test/config/hadoop/core-site-loopback.xml +++ b/modules/core/src/test/config/hadoop/core-site-loopback.xml @@ -29,12 +29,12 @@ <property> <name>fs.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value> </property> <property> <name>fs.AbstractFileSystem.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value> </property> <property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/core/src/test/config/hadoop/core-site-secondary.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/hadoop/core-site-secondary.xml b/modules/core/src/test/config/hadoop/core-site-secondary.xml index 648df36..fa301be 100644 --- a/modules/core/src/test/config/hadoop/core-site-secondary.xml +++ b/modules/core/src/test/config/hadoop/core-site-secondary.xml @@ -29,12 +29,12 @@ <property> <name>fs.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value> </property> <property> <name>fs.AbstractFileSystem.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value> </property> <property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/core/src/test/config/hadoop/core-site.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/hadoop/core-site.xml b/modules/core/src/test/config/hadoop/core-site.xml index eb16ce7..0a9eee7 100644 --- a/modules/core/src/test/config/hadoop/core-site.xml +++ b/modules/core/src/test/config/hadoop/core-site.xml @@ -29,11 +29,11 @@ <property> <name>fs.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value> </property> <property> <name>fs.AbstractFileSystem.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value> + <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value> </property> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java deleted file mode 100644 index bd31951..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.ipc.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.protocol.*; -import org.apache.hadoop.mapreduce.security.token.delegation.*; -import org.apache.hadoop.mapreduce.v2.*; -import org.apache.hadoop.mapreduce.v2.jobhistory.*; -import org.apache.hadoop.security.*; -import org.apache.hadoop.security.authorize.*; -import org.apache.hadoop.security.token.*; -import org.apache.ignite.*; -import org.apache.ignite.client.hadoop.counter.*; -import org.apache.ignite.internal.client.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.proto.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Hadoop client protocol. - */ -public class GridHadoopClientProtocol implements ClientProtocol { - /** Ignite framework name property. */ - public static final String FRAMEWORK_NAME = "ignite"; - - /** Protocol version. */ - private static final long PROTO_VER = 1L; - - /** Default Ignite system directory. */ - private static final String SYS_DIR = ".ignite/system"; - - /** Configuration. */ - private final Configuration conf; - - /** Ignite client. */ - private volatile GridClient cli; - - /** Last received version. */ - private long lastVer = -1; - - /** Last received status. */ - private GridHadoopJobStatus lastStatus; - - /** - * Constructor. - * - * @param conf Configuration. - * @param cli Ignite client. - */ - GridHadoopClientProtocol(Configuration conf, GridClient cli) { - assert cli != null; - - this.conf = conf; - this.cli = cli; - } - - /** {@inheritDoc} */ - @Override public JobID getNewJobID() throws IOException, InterruptedException { - try { - conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); - - GridHadoopJobId jobID = cli.compute().execute(GridHadoopProtocolNextTaskIdTask.class.getName(), null); - - conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); - - return new JobID(jobID.globalId().toString(), jobID.localId()); - } - catch (GridClientException e) { - throw new IOException("Failed to get new job ID.", e); - } - } - - /** {@inheritDoc} */ - @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, - InterruptedException { - try { - conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); - - GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolSubmitJobTask.class.getName(), - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); - - if (status == null) - throw new IOException("Failed to submit job (null status obtained): " + jobId); - - return processStatus(status); - } - catch (GridClientException | IgniteCheckedException e) { - throw new IOException("Failed to submit job.", e); - } - } - - /** {@inheritDoc} */ - @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { - return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0); - } - - /** {@inheritDoc} */ - @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException { - return Cluster.JobTrackerStatus.RUNNING; - } - - /** {@inheritDoc} */ - @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException { - return 0; - } - - /** {@inheritDoc} */ - @Override public AccessControlList getQueueAdmins(String queueName) throws IOException { - return new AccessControlList("*"); - } - - /** {@inheritDoc} */ - @Override public void killJob(JobID jobId) throws IOException, InterruptedException { - try { - cli.compute().execute(GridHadoopProtocolKillJobTask.class.getName(), - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); - } - catch (GridClientException e) { - throw new IOException("Failed to kill job: " + jobId, e); - } - } - - /** {@inheritDoc} */ - @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, - InterruptedException { - return false; - } - - /** {@inheritDoc} */ - @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException { - try { - Long delay = conf.getLong(GridHadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); - - GridHadoopProtocolTaskArguments args = delay >= 0 ? - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); - - GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolJobStatusTask.class.getName(), args); - - if (status == null) - throw new IOException("Job tracker doesn't have any information about the job: " + jobId); - - return processStatus(status); - } - catch (GridClientException e) { - throw new IOException("Failed to get job status: " + jobId, e); - } - } - - /** {@inheritDoc} */ - @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { - try { - final GridHadoopCounters counters = cli.compute().execute(GridHadoopProtocolJobCountersTask.class.getName(), - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); - - if (counters == null) - throw new IOException("Job tracker doesn't have any information about the job: " + jobId); - - return new GridHadoopClientCounters(counters); - } - catch (GridClientException e) { - throw new IOException("Failed to get job counters: " + jobId, e); - } - } - - /** {@inheritDoc} */ - @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException { - return new TaskReport[0]; - } - - /** {@inheritDoc} */ - @Override public String getFilesystemName() throws IOException, InterruptedException { - return FileSystem.get(conf).getUri().toString(); - } - - /** {@inheritDoc} */ - @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException { - return new JobStatus[0]; - } - - /** {@inheritDoc} */ - @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents) - throws IOException, InterruptedException { - return new TaskCompletionEvent[0]; - } - - /** {@inheritDoc} */ - @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException { - return new String[0]; - } - - /** {@inheritDoc} */ - @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { - return new TaskTrackerInfo[0]; - } - - /** {@inheritDoc} */ - @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { - return new TaskTrackerInfo[0]; - } - - /** {@inheritDoc} */ - @Override public String getSystemDir() throws IOException, InterruptedException { - Path sysDir = new Path(SYS_DIR); - - return sysDir.toString(); - } - - /** {@inheritDoc} */ - @Override public String getStagingAreaDir() throws IOException, InterruptedException { - String usr = UserGroupInformation.getCurrentUser().getShortUserName(); - - return GridHadoopUtils.stagingAreaDir(conf, usr).toString(); - } - - /** {@inheritDoc} */ - @Override public String getJobHistoryDir() throws IOException, InterruptedException { - return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); - } - - /** {@inheritDoc} */ - @Override public QueueInfo[] getQueues() throws IOException, InterruptedException { - return new QueueInfo[0]; - } - - /** {@inheritDoc} */ - @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { - return new QueueAclsInfo[0]; - } - - /** {@inheritDoc} */ - @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException { - return new QueueInfo[0]; - } - - /** {@inheritDoc} */ - @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException { - return new QueueInfo[0]; - } - - /** {@inheritDoc} */ - @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, - InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, - InterruptedException { - return 0; - } - - /** {@inheritDoc} */ - @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, - InterruptedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException, - InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return PROTO_VER; - } - - /** {@inheritDoc} */ - @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) - throws IOException { - return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); - } - - /** - * Process received status update. - * - * @param status Ignite status. - * @return Hadoop status. - */ - private JobStatus processStatus(GridHadoopJobStatus status) { - // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because - // GridHadoopClientProtocolProvider creates new instance of this class for every new job and Job class - // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will - // change in future and either protocol will serve statuses for several jobs or status update will not be - // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap). - // (vozerov) - if (lastVer < status.version()) { - lastVer = status.version(); - - lastStatus = status; - } - else - assert lastStatus != null; - - return GridHadoopUtils.status(lastStatus, conf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java deleted file mode 100644 index a9a1c9d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.protocol.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.client.*; -import org.apache.ignite.internal.client.marshaller.optimized.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.client.GridClientProtocol.*; -import static org.apache.ignite.client.hadoop.GridHadoopClientProtocol.*; - - -/** - * Grid Hadoop client protocol provider. - */ -public class GridHadoopClientProtocolProvider extends ClientProtocolProvider { - /** Clients. */ - private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>(); - - /** {@inheritDoc} */ - @Override public ClientProtocol create(Configuration conf) throws IOException { - if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { - String addr = conf.get(MRConfig.MASTER_ADDRESS); - - if (F.isEmpty(addr)) - throw new IOException("Failed to create client protocol because server address is not specified (is " + - MRConfig.MASTER_ADDRESS + " property set?)."); - - if (F.eq(addr, "local")) - throw new IOException("Local execution mode is not supported, please point " + - MRConfig.MASTER_ADDRESS + " to real Ignite node."); - - return createProtocol(addr, conf); - } - - return null; - } - - /** {@inheritDoc} */ - @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { - if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) - return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf); - - return null; - } - - /** {@inheritDoc} */ - @Override public void close(ClientProtocol cliProto) throws IOException { - // No-op. - } - - /** - * Internal protocol creation routine. - * - * @param addr Address. - * @param conf Configuration. - * @return Client protocol. - * @throws IOException If failed. - */ - private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException { - return new GridHadoopClientProtocol(conf, client(addr)); - } - - /** - * Create client. - * - * @param addr Endpoint address. - * @return Client. - * @throws IOException If failed. - */ - private static GridClient client(String addr) throws IOException { - try { - IgniteInternalFuture<GridClient> fut = cliMap.get(addr); - - if (fut == null) { - GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>(); - - IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0); - - if (oldFut != null) - return oldFut.get(); - else { - GridClientConfiguration cliCfg = new GridClientConfiguration(); - - cliCfg.setProtocol(TCP); - cliCfg.setServers(Collections.singletonList(addr)); - cliCfg.setMarshaller(new GridClientOptimizedMarshaller()); - cliCfg.setDaemon(true); - - try { - GridClient cli = GridClientFactory.start(cliCfg); - - fut0.onDone(cli); - - return cli; - } - catch (GridClientException e) { - fut0.onDone(e); - - throw new IOException("Failed to establish connection with Ignite node: " + addr, e); - } - } - } - else - return fut.get(); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to establish connection with Ignite node: " + addr, e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounterGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounterGroup.java deleted file mode 100644 index 37cd28d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounterGroup.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop.counter; - -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.counters.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop Client API Counters adapter. - */ -class GridHadoopClientCounterGroup implements CounterGroup { - /** Counters. */ - private final GridHadoopClientCounters cntrs; - - /** Group name. */ - private final String name; - - /** - * Creates new instance. - * - * @param cntrs Client counters instance. - * @param name Group name. - */ - GridHadoopClientCounterGroup(GridHadoopClientCounters cntrs, String name) { - this.cntrs = cntrs; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String getName() { - return name; - } - - /** {@inheritDoc} */ - @Override public String getDisplayName() { - return name; - } - - /** {@inheritDoc} */ - @Override public void setDisplayName(String displayName) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void addCounter(Counter counter) { - addCounter(counter.getName(), counter.getDisplayName(), 0); - } - - /** {@inheritDoc} */ - @Override public Counter addCounter(String name, String displayName, long value) { - final Counter counter = cntrs.findCounter(this.name, name); - - counter.setValue(value); - - return counter; - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, String displayName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, boolean create) { - return cntrs.findCounter(name, counterName, create); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public int size() { - return cntrs.groupSize(name); - } - - /** {@inheritDoc} */ - @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) { - for (final Counter counter : rightGroup) - cntrs.findCounter(name, counter.getName()).increment(counter.getValue()); - } - - /** {@inheritDoc} */ - @Override public CounterGroupBase<Counter> getUnderlyingGroup() { - return this; - } - - /** {@inheritDoc} */ - @Override public Iterator<Counter> iterator() { - return cntrs.iterateGroup(name); - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java deleted file mode 100644 index 9f4ec02..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop.counter; - -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.counters.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop Client API Counters adapter. - */ -public class GridHadoopClientCounters extends Counters { - /** */ - private final Map<T2<String,String>,GridHadoopLongCounter> cntrs = new HashMap<>(); - - /** - * Creates new instance based on given counters. - * - * @param cntrs Counters to adapt. - */ - public GridHadoopClientCounters(GridHadoopCounters cntrs) { - for (GridHadoopCounter cntr : cntrs.all()) - if (cntr instanceof GridHadoopLongCounter) - this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (GridHadoopLongCounter) cntr); - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup addGroup(CounterGroup grp) { - return addGroup(grp.getName(), grp.getDisplayName()); - } - - /** {@inheritDoc} */ - @Override public CounterGroup addGroup(String name, String displayName) { - return new GridHadoopClientCounterGroup(this, name); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String grpName, String cntrName) { - return findCounter(grpName, cntrName, true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(Enum<?> key) { - return findCounter(key.getDeclaringClass().getName(), key.name(), true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) { - return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name()); - } - - /** {@inheritDoc} */ - @Override public synchronized Iterable<String> getGroupNames() { - Collection<String> res = new HashSet<>(); - - for (GridHadoopCounter counter : cntrs.values()) - res.add(counter.group()); - - return res; - } - - /** {@inheritDoc} */ - @Override public Iterator<CounterGroup> iterator() { - final Iterator<String> iter = getGroupNames().iterator(); - - return new Iterator<CounterGroup>() { - @Override public boolean hasNext() { - return iter.hasNext(); - } - - @Override public CounterGroup next() { - if (!hasNext()) - throw new NoSuchElementException(); - - return new GridHadoopClientCounterGroup(GridHadoopClientCounters.this, iter.next()); - } - - @Override public void remove() { - throw new UnsupportedOperationException("not implemented"); - } - }; - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup getGroup(String grpName) { - return new GridHadoopClientCounterGroup(this, grpName); - } - - /** {@inheritDoc} */ - @Override public synchronized int countCounters() { - return cntrs.size(); - } - - /** {@inheritDoc} */ - @Override public synchronized void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) { - for (CounterGroup group : other) { - for (Counter counter : group) { - findCounter(group.getName(), counter.getName()).increment(counter.getValue()); - } - } - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object genericRight) { - if (!(genericRight instanceof GridHadoopClientCounters)) - return false; - - return cntrs.equals(((GridHadoopClientCounters) genericRight).cntrs); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return cntrs.hashCode(); - } - - /** {@inheritDoc} */ - @Override public void setWriteAllCounters(boolean snd) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean getWriteAllCounters() { - return true; - } - - /** {@inheritDoc} */ - @Override public Limits limits() { - return null; - } - - /** - * Returns size of a group. - * - * @param grpName Name of the group. - * @return amount of counters in the given group. - */ - public int groupSize(String grpName) { - int res = 0; - - for (GridHadoopCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - res++; - } - - return res; - } - - /** - * Returns counters iterator for specified group. - * - * @param grpName Name of the group to iterate. - * @return Counters iterator. - */ - public Iterator<Counter> iterateGroup(String grpName) { - Collection<Counter> grpCounters = new ArrayList<>(); - - for (GridHadoopLongCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - grpCounters.add(new GridHadoopV2Counter(counter)); - } - - return grpCounters.iterator(); - } - - /** - * Find a counter in the group. - * - * @param grpName The name of the counter group. - * @param cntrName The name of the counter. - * @param create Create the counter if not found if true. - * @return The counter that was found or added or {@code null} if create is false. - */ - public Counter findCounter(String grpName, String cntrName, boolean create) { - T2<String, String> key = new T2<>(grpName, cntrName); - - GridHadoopLongCounter internalCntr = cntrs.get(key); - - if (internalCntr == null & create) { - internalCntr = new GridHadoopLongCounter(grpName,cntrName); - - cntrs.put(key, new GridHadoopLongCounter(grpName,cntrName)); - } - - return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/package.html b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/package.html deleted file mode 100644 index f182598..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Ignite Hadoop client protocol. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java new file mode 100644 index 0000000..5f06a65 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.ipc.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}. + */ +public class IgniteHadoopSecondaryFileSystem implements Igfs, AutoCloseable { + /** Property name for path to Hadoop configuration. */ + public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; + + /** Property name for URI of file system. */ + public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; + + /** Hadoop file system. */ + private final FileSystem fileSys; + + /** Properties of file system */ + private final Map<String, String> props = new HashMap<>(); + + /** + * Constructor. + * + * @param uri URI of file system. + * @param cfgPath Additional path to Hadoop configuration. + * @throws IgniteCheckedException In case of error. + */ + public IgniteHadoopSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) + throws IgniteCheckedException { + Configuration cfg = new Configuration(); + + if (cfgPath != null) + cfg.addResource(U.resolveIgniteUrl(cfgPath)); + + try { + fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg); + } + catch (IOException | URISyntaxException e) { + throw new IgniteCheckedException(e); + } + + uri = fileSys.getUri().toString(); + + if (!uri.endsWith("/")) + uri += "/"; + + props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); + props.put(SECONDARY_FS_URI, uri); + } + + /** + * Convert IGFS path into Hadoop path. + * + * @param path IGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + URI uri = fileSys.getUri(); + + return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); + } + + /** + * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. + * + * @param e Exception to check. + * @param detailMsg Detailed error message. + * @return Appropriate exception. + */ + private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { + boolean wrongVer = X.hasCause(e, RemoteException.class) || + (e.getMessage() != null && e.getMessage().contains("Failed on local")); + + IgfsException igfsErr = !wrongVer ? cast(detailMsg, e) : + new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " + + "version.", e); + + return igfsErr; + } + + /** + * Cast IO exception to IGFS exception. + * + * @param e IO exception. + * @return IGFS exception. + */ + public static IgfsException cast(String msg, IOException e) { + if (e instanceof FileNotFoundException) + return new IgfsFileNotFoundException(e); + else if (e instanceof ParentNotDirectoryException) + return new IgfsParentNotDirectoryException(msg, e); + else if (e instanceof PathIsNotEmptyDirectoryException) + return new IgfsDirectoryNotEmptyException(e); + else if (e instanceof PathExistsException) + return new IgfsPathAlreadyExistsException(msg, e); + else + return new IgfsException(msg, e); + } + + /** + * Convert Hadoop FileStatus properties to map. + * + * @param status File status. + * @return IGFS attributes. + */ + private static Map<String, String> properties(FileStatus status) { + FsPermission perm = status.getPermission(); + + if (perm == null) + perm = FsPermission.getDefault(); + + return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(), + PROP_GROUP_NAME, status.getGroup()); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + try { + return fileSys.exists(convert(path)); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props); + + try { + if (props0.userName() != null || props0.groupName() != null) + fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); + + if (props0.permission() != null) + fileSys.setPermission(convert(path), props0.permission()); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); + } + + //Result is not used in case of secondary FS. + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + // Delegate to the secondary file system. + try { + if (!fileSys.rename(convert(src), convert(dest))) + throw new IgfsException("Failed to rename (secondary file system returned false) " + + "[src=" + src + ", dest=" + dest + ']'); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); + } + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + try { + return fileSys.delete(convert(path), recursive); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + try { + if (!fileSys.mkdirs(convert(path))) + throw new IgniteException("Failed to make directories [path=" + path + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + try { + if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission())) + throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + try { + FileStatus[] statuses = fileSys.listStatus(convert(path)); + + if (statuses == null) + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsPath> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) + res.add(new IgfsPath(path, status.getPath().getName())); + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + try { + FileStatus[] statuses = fileSys.listStatus(convert(path)); + + if (statuses == null) + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsFile> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) { + IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) : + new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false, + properties(status)); + + res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1)); + } + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsReader open(IgfsPath path, int bufSize) { + return new IgfsHadoopReader(fileSys, convert(path), bufSize); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) { + try { + return fileSys.create(convert(path), overwrite); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + IgfsHadoopFSProperties props0 = + new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap()); + + try { + return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, + null); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + + ", blockSize=" + blockSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + try { + return fileSys.append(convert(path), bufSize); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) { + try { + final FileStatus status = fileSys.getFileStatus(convert(path)); + + if (status == null) + return null; + + final Map<String, String> props = properties(status); + + return new IgfsFile() { + @Override public IgfsPath path() { + return path; + } + + @Override public boolean isFile() { + return status.isFile(); + } + + @Override public boolean isDirectory() { + return status.isDirectory(); + } + + @Override public int blockSize() { + return (int)status.getBlockSize(); + } + + @Override public long groupBlockSize() { + return status.getBlockSize(); + } + + @Override public long accessTime() { + return status.getAccessTime(); + } + + @Override public long modificationTime() { + return status.getModificationTime(); + } + + @Override public String property(String name) throws IllegalArgumentException { + String val = props.get(name); + + if (val == null) + throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); + + return val; + } + + @Nullable @Override public String property(String name, @Nullable String dfltVal) { + String val = props.get(name); + + return val == null ? dfltVal : val; + } + + @Override public long length() { + return status.getLen(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return props; + } + }; + + } + catch (FileNotFoundException ignore) { + return null; + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + try { + return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed(); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get used space size of file system."); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, String> properties() { + return props; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + try { + fileSys.close(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package.html new file mode 100644 index 0000000..1d78952 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Ignite Hadoop Accelerator file system API. +</body> +</html>