http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java deleted file mode 100644 index e1bf9b6..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.ignite.hadoop.fs.v1.*; - -/** - * Utilities for configuring file systems to support the separate working directory per each thread. - */ -public class GridHadoopFileSystemsUtils { - /** Name of the property for setting working directory on create new local FS instance. */ - public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; - - /** - * Set user name and default working directory for current thread if it's supported by file system. - * - * @param fs File system. - * @param userName User name. - */ - public static void setUser(FileSystem fs, String userName) { - if (fs instanceof IgniteHadoopFileSystem) - ((IgniteHadoopFileSystem)fs).setUser(userName); - else if (fs instanceof GridHadoopDistributedFileSystem) - ((GridHadoopDistributedFileSystem)fs).setUser(userName); - } - - /** - * Setup wrappers of filesystems to support the separate working directory. - * - * @param cfg Config for setup. - */ - public static void setupFileSystems(Configuration cfg) { - cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", GridHadoopLocalFileSystemV1.class.getName()); - cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", - GridHadoopLocalFileSystemV2.class.getName()); - - cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", GridHadoopDistributedFileSystem.class.getName()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java deleted file mode 100644 index 28834d4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.fs.*; - -import java.io.*; - -/** - * Local file system replacement for Hadoop jobs. - */ -public class GridHadoopLocalFileSystemV1 extends LocalFileSystem { - /** - * Creates new local file system. - */ - public GridHadoopLocalFileSystemV1() { - super(new GridHadoopRawLocalFileSystem()); - } - - /** {@inheritDoc} */ - @Override public File pathToFile(Path path) { - return ((GridHadoopRawLocalFileSystem)getRaw()).convert(path); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java deleted file mode 100644 index 62d7cea..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.local.*; - -import java.io.*; -import java.net.*; - -import static org.apache.hadoop.fs.FsConstants.*; - -/** - * Local file system replacement for Hadoop jobs. - */ -public class GridHadoopLocalFileSystemV2 extends ChecksumFs { - /** - * Creates new local file system. - * - * @param cfg Configuration. - * @throws IOException If failed. - * @throws URISyntaxException If failed. - */ - public GridHadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException { - super(new DelegateFS(cfg)); - } - - /** - * Creates new local file system. - * - * @param uri URI. - * @param cfg Configuration. - * @throws IOException If failed. - * @throws URISyntaxException If failed. - */ - public GridHadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException { - this(cfg); - } - - /** - * Delegate file system. - */ - private static class DelegateFS extends DelegateToFileSystem { - /** - * Creates new local file system. - * - * @param cfg Configuration. - * @throws IOException If failed. - * @throws URISyntaxException If failed. - */ - public DelegateFS(Configuration cfg) throws IOException, URISyntaxException { - super(LOCAL_FS_URI, new GridHadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false); - } - - /** {@inheritDoc} */ - @Override public int getUriDefaultPort() { - return -1; - } - - /** {@inheritDoc} */ - @Override public FsServerDefaults getServerDefaults() throws IOException { - return LocalConfigKeys.getServerDefaults(); - } - - /** {@inheritDoc} */ - @Override public boolean isValidName(String src) { - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java deleted file mode 100644 index 29645f8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.net.*; -import java.nio.file.*; - -/** - * Local file system implementation for Hadoop. - */ -public class GridHadoopRawLocalFileSystem extends FileSystem { - /** Working directory for each thread. */ - private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() { - @Override protected Path initialValue() { - return getInitialWorkingDirectory(); - } - }; - - /** - * Converts Hadoop path to local path. - * - * @param path Hadoop path. - * @return Local path. - */ - File convert(Path path) { - checkPath(path); - - if (path.isAbsolute()) - return new File(path.toUri().getPath()); - - return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath()); - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - return makeQualified(new Path(System.getProperty("user.home"))); - } - - /** {@inheritDoc} */ - @Override public Path getInitialWorkingDirectory() { - File f = new File(System.getProperty("user.dir")); - - return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - - setConf(conf); - - String initWorkDir = conf.get(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP); - - if (initWorkDir != null) - setWorkingDirectory(new Path(initWorkDir)); - } - - /** {@inheritDoc} */ - @Override public URI getUri() { - return FsConstants.LOCAL_FS_URI; - } - - /** {@inheritDoc} */ - @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return new FSDataInputStream(new InStream(checkExists(convert(f)))); - } - - /** {@inheritDoc} */ - @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize, - short replication, long blockSize, Progressable progress) throws IOException { - File file = convert(f); - - if (!overwrite && !file.createNewFile()) - throw new IOException("Failed to create new file: " + f.toUri()); - - return out(file, false, bufSize); - } - - /** - * @param file File. - * @param append Append flag. - * @return Output stream. - * @throws IOException If failed. - */ - private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException { - return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append), - bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme())); - } - - /** {@inheritDoc} */ - @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { - return out(convert(f), true, bufSize); - } - - /** {@inheritDoc} */ - @Override public boolean rename(Path src, Path dst) throws IOException { - return convert(src).renameTo(convert(dst)); - } - - /** {@inheritDoc} */ - @Override public boolean delete(Path f, boolean recursive) throws IOException { - File file = convert(f); - - if (file.isDirectory() && !recursive) - throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri()); - - return U.delete(file); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path dir) { - workDir.set(fixRelativePart(dir)); - - checkPath(dir); - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workDir.get(); - } - - /** {@inheritDoc} */ - @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { - if(f == null) - throw new IllegalArgumentException("mkdirs path arg is null"); - - Path parent = f.getParent(); - - File p2f = convert(f); - - if(parent != null) { - File parent2f = convert(parent); - - if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) - throw new FileAlreadyExistsException("Parent path is not a directory: " + parent); - - } - - return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory()); - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileStatus(Path f) throws IOException { - return fileStatus(checkExists(convert(f))); - } - - /** - * @return File status. - */ - private FileStatus fileStatus(File file) throws IOException { - boolean dir = file.isDirectory(); - - java.nio.file.Path path = dir ? null : file.toPath(); - - return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(), - /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ? - new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI())); - } - - /** - * @param file File. - * @return Same file. - * @throws FileNotFoundException If does not exist. - */ - private static File checkExists(File file) throws FileNotFoundException { - if (!file.exists()) - throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist."); - - return file; - } - - /** {@inheritDoc} */ - @Override public FileStatus[] listStatus(Path f) throws IOException { - File file = convert(f); - - if (checkExists(file).isFile()) - return new FileStatus[] {fileStatus(file)}; - - File[] files = file.listFiles(); - - FileStatus[] res = new FileStatus[files.length]; - - for (int i = 0; i < res.length; i++) - res[i] = fileStatus(files[i]); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean supportsSymlinks() { - return true; - } - - /** {@inheritDoc} */ - @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException { - Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath()); - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileLinkStatus(Path f) throws IOException { - return getFileStatus(getLinkTarget(f)); - } - - /** {@inheritDoc} */ - @Override public Path getLinkTarget(Path f) throws IOException { - File file = Files.readSymbolicLink(convert(f).toPath()).toFile(); - - return new Path(file.toURI()); - } - - /** - * Input stream. - */ - private static class InStream extends InputStream implements Seekable, PositionedReadable { - /** */ - private final RandomAccessFile file; - - /** - * @param f File. - * @throws IOException If failed. - */ - public InStream(File f) throws IOException { - file = new RandomAccessFile(f, "r"); - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - return file.read(); - } - - /** {@inheritDoc} */ - @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - return file.read(b, off, len); - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - file.close(); - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { - long pos0 = file.getFilePointer(); - - file.seek(pos); - int res = file.read(buf, off, len); - - file.seek(pos0); - - return res; - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { - if (read(pos, buf, off, len) != len) - throw new IOException(); - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf) throws IOException { - readFully(pos, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - file.seek(pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() throws IOException { - return file.getFilePointer(); - } - - /** {@inheritDoc} */ - @Override public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java new file mode 100644 index 0000000..88c5899 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.mapreduce.*; + +import java.io.*; +import java.net.*; + +import static org.apache.ignite.configuration.IgfsConfiguration.*; + +/** + * Wrapper of HDFS for support of separated working directory. + */ +public class HadoopDistributedFileSystem extends DistributedFileSystem { + /** User name for each thread. */ + private final ThreadLocal<String> userName = new ThreadLocal<String>() { + /** {@inheritDoc} */ + @Override protected String initialValue() { + return DFLT_USER_NAME; + } + }; + + /** Working directory for each thread. */ + private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() { + /** {@inheritDoc} */ + @Override protected Path initialValue() { + return getHomeDirectory(); + } + }; + + /** {@inheritDoc} */ + @Override public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + + setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); + } + + /** + * Set user name and default working directory for current thread. + * + * @param userName User name. + */ + public void setUser(String userName) { + this.userName.set(userName); + + setWorkingDirectory(getHomeDirectory()); + } + + /** {@inheritDoc} */ + @Override public Path getHomeDirectory() { + Path path = new Path("/user/" + userName.get()); + + return path.makeQualified(getUri(), null); + } + + /** {@inheritDoc} */ + @Override public void setWorkingDirectory(Path dir) { + Path fixedDir = fixRelativePart(dir); + + String res = fixedDir.toUri().getPath(); + + if (!DFSUtil.isValidName(res)) + throw new IllegalArgumentException("Invalid DFS directory name " + res); + + workingDir.set(fixedDir); + } + + /** {@inheritDoc} */ + @Override public Path getWorkingDirectory() { + return workingDir.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java new file mode 100644 index 0000000..f3f51d4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.ignite.hadoop.fs.v1.*; + +/** + * Utilities for configuring file systems to support the separate working directory per each thread. + */ +public class HadoopFileSystemsUtils { + /** Name of the property for setting working directory on create new local FS instance. */ + public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; + + /** + * Set user name and default working directory for current thread if it's supported by file system. + * + * @param fs File system. + * @param userName User name. + */ + public static void setUser(FileSystem fs, String userName) { + if (fs instanceof IgniteHadoopFileSystem) + ((IgniteHadoopFileSystem)fs).setUser(userName); + else if (fs instanceof HadoopDistributedFileSystem) + ((HadoopDistributedFileSystem)fs).setUser(userName); + } + + /** + * Setup wrappers of filesystems to support the separate working directory. + * + * @param cfg Config for setup. + */ + public static void setupFileSystems(Configuration cfg) { + cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); + cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", + HadoopLocalFileSystemV2.class.getName()); + + cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java new file mode 100644 index 0000000..9cc5881 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.fs.*; + +import java.io.*; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV1 extends LocalFileSystem { + /** + * Creates new local file system. + */ + public HadoopLocalFileSystemV1() { + super(new HadoopRawLocalFileSystem()); + } + + /** {@inheritDoc} */ + @Override public File pathToFile(Path path) { + return ((HadoopRawLocalFileSystem)getRaw()).convert(path); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java new file mode 100644 index 0000000..15ddc5a --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.local.*; + +import java.io.*; +import java.net.*; + +import static org.apache.hadoop.fs.FsConstants.*; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV2 extends ChecksumFs { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException { + super(new DelegateFS(cfg)); + } + + /** + * Creates new local file system. + * + * @param uri URI. + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException { + this(cfg); + } + + /** + * Delegate file system. + */ + private static class DelegateFS extends DelegateToFileSystem { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public DelegateFS(Configuration cfg) throws IOException, URISyntaxException { + super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false); + } + + /** {@inheritDoc} */ + @Override public int getUriDefaultPort() { + return -1; + } + + /** {@inheritDoc} */ + @Override public FsServerDefaults getServerDefaults() throws IOException { + return LocalConfigKeys.getServerDefaults(); + } + + /** {@inheritDoc} */ + @Override public boolean isValidName(String src) { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java new file mode 100644 index 0000000..e5ec3f7 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.nio.file.*; + +/** + * Local file system implementation for Hadoop. + */ +public class HadoopRawLocalFileSystem extends FileSystem { + /** Working directory for each thread. */ + private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() { + @Override protected Path initialValue() { + return getInitialWorkingDirectory(); + } + }; + + /** + * Converts Hadoop path to local path. + * + * @param path Hadoop path. + * @return Local path. + */ + File convert(Path path) { + checkPath(path); + + if (path.isAbsolute()) + return new File(path.toUri().getPath()); + + return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath()); + } + + /** {@inheritDoc} */ + @Override public Path getHomeDirectory() { + return makeQualified(new Path(System.getProperty("user.home"))); + } + + /** {@inheritDoc} */ + @Override public Path getInitialWorkingDirectory() { + File f = new File(System.getProperty("user.dir")); + + return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null); + } + + /** {@inheritDoc} */ + @Override public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + + setConf(conf); + + String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP); + + if (initWorkDir != null) + setWorkingDirectory(new Path(initWorkDir)); + } + + /** {@inheritDoc} */ + @Override public URI getUri() { + return FsConstants.LOCAL_FS_URI; + } + + /** {@inheritDoc} */ + @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return new FSDataInputStream(new InStream(checkExists(convert(f)))); + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize, + short replication, long blockSize, Progressable progress) throws IOException { + File file = convert(f); + + if (!overwrite && !file.createNewFile()) + throw new IOException("Failed to create new file: " + f.toUri()); + + return out(file, false, bufSize); + } + + /** + * @param file File. + * @param append Append flag. + * @return Output stream. + * @throws IOException If failed. + */ + private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException { + return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append), + bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme())); + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { + return out(convert(f), true, bufSize); + } + + /** {@inheritDoc} */ + @Override public boolean rename(Path src, Path dst) throws IOException { + return convert(src).renameTo(convert(dst)); + } + + /** {@inheritDoc} */ + @Override public boolean delete(Path f, boolean recursive) throws IOException { + File file = convert(f); + + if (file.isDirectory() && !recursive) + throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri()); + + return U.delete(file); + } + + /** {@inheritDoc} */ + @Override public void setWorkingDirectory(Path dir) { + workDir.set(fixRelativePart(dir)); + + checkPath(dir); + } + + /** {@inheritDoc} */ + @Override public Path getWorkingDirectory() { + return workDir.get(); + } + + /** {@inheritDoc} */ + @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { + if(f == null) + throw new IllegalArgumentException("mkdirs path arg is null"); + + Path parent = f.getParent(); + + File p2f = convert(f); + + if(parent != null) { + File parent2f = convert(parent); + + if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) + throw new FileAlreadyExistsException("Parent path is not a directory: " + parent); + + } + + return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory()); + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileStatus(Path f) throws IOException { + return fileStatus(checkExists(convert(f))); + } + + /** + * @return File status. + */ + private FileStatus fileStatus(File file) throws IOException { + boolean dir = file.isDirectory(); + + java.nio.file.Path path = dir ? null : file.toPath(); + + return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(), + /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ? + new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI())); + } + + /** + * @param file File. + * @return Same file. + * @throws FileNotFoundException If does not exist. + */ + private static File checkExists(File file) throws FileNotFoundException { + if (!file.exists()) + throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist."); + + return file; + } + + /** {@inheritDoc} */ + @Override public FileStatus[] listStatus(Path f) throws IOException { + File file = convert(f); + + if (checkExists(file).isFile()) + return new FileStatus[] {fileStatus(file)}; + + File[] files = file.listFiles(); + + FileStatus[] res = new FileStatus[files.length]; + + for (int i = 0; i < res.length; i++) + res[i] = fileStatus(files[i]); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean supportsSymlinks() { + return true; + } + + /** {@inheritDoc} */ + @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException { + Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath()); + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileLinkStatus(Path f) throws IOException { + return getFileStatus(getLinkTarget(f)); + } + + /** {@inheritDoc} */ + @Override public Path getLinkTarget(Path f) throws IOException { + File file = Files.readSymbolicLink(convert(f).toPath()).toFile(); + + return new Path(file.toURI()); + } + + /** + * Input stream. + */ + private static class InStream extends InputStream implements Seekable, PositionedReadable { + /** */ + private final RandomAccessFile file; + + /** + * @param f File. + * @throws IOException If failed. + */ + public InStream(File f) throws IOException { + file = new RandomAccessFile(f, "r"); + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + return file.read(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + return file.read(b, off, len); + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + file.close(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + long pos0 = file.getFilePointer(); + + file.seek(pos); + int res = file.read(buf, off, len); + + file.seek(pos0); + + return res; + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { + if (read(pos, buf, off, len) != len) + throw new IOException(); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf) throws IOException { + readFully(pos, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + file.seek(pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() throws IOException { + return file.getFilePointer(); + } + + /** {@inheritDoc} */ + @Override public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java deleted file mode 100644 index b124312..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.jobtracker; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; - -/** - * Hadoop job metadata. Internal object used for distributed job state tracking. - */ -public class GridHadoopJobMetadata implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - private GridHadoopJobId jobId; - - /** Job info. */ - private GridHadoopJobInfo jobInfo; - - /** Node submitted job. */ - private UUID submitNodeId; - - /** Map-reduce plan. */ - private GridHadoopMapReducePlan mrPlan; - - /** Pending splits for which mapper should be executed. */ - private Map<GridHadoopInputSplit, Integer> pendingSplits; - - /** Pending reducers. */ - private Collection<Integer> pendingReducers; - - /** Reducers addresses. */ - @GridToStringInclude - private Map<Integer, GridHadoopProcessDescriptor> reducersAddrs; - - /** Job phase. */ - private GridHadoopJobPhase phase = PHASE_SETUP; - - /** Fail cause. */ - @GridToStringExclude - private Throwable failCause; - - /** Version. */ - private long ver; - - /** Job counters */ - private GridHadoopCounters counters = new GridHadoopCountersImpl(); - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridHadoopJobMetadata() { - // No-op. - } - - /** - * Constructor. - * - * @param submitNodeId Submit node ID. - * @param jobId Job ID. - * @param jobInfo Job info. - */ - public GridHadoopJobMetadata(UUID submitNodeId, GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { - this.jobId = jobId; - this.jobInfo = jobInfo; - this.submitNodeId = submitNodeId; - } - - /** - * Copy constructor. - * - * @param src Metadata to copy. - */ - public GridHadoopJobMetadata(GridHadoopJobMetadata src) { - // Make sure to preserve alphabetic order. - counters = src.counters; - failCause = src.failCause; - jobId = src.jobId; - jobInfo = src.jobInfo; - mrPlan = src.mrPlan; - pendingSplits = src.pendingSplits; - pendingReducers = src.pendingReducers; - phase = src.phase; - reducersAddrs = src.reducersAddrs; - submitNodeId = src.submitNodeId; - ver = src.ver + 1; - } - - /** - * @return Submit node ID. - */ - public UUID submitNodeId() { - return submitNodeId; - } - - /** - * @param phase Job phase. - */ - public void phase(GridHadoopJobPhase phase) { - this.phase = phase; - } - - /** - * @return Job phase. - */ - public GridHadoopJobPhase phase() { - return phase; - } - - /** - * Gets reducers addresses for external execution. - * - * @return Reducers addresses. - */ - public Map<Integer, GridHadoopProcessDescriptor> reducersAddresses() { - return reducersAddrs; - } - - /** - * Sets reducers addresses for external execution. - * - * @param reducersAddrs Map of addresses. - */ - public void reducersAddresses(Map<Integer, GridHadoopProcessDescriptor> reducersAddrs) { - this.reducersAddrs = reducersAddrs; - } - - /** - * Sets collection of pending splits. - * - * @param pendingSplits Collection of pending splits. - */ - public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) { - this.pendingSplits = pendingSplits; - } - - /** - * Gets collection of pending splits. - * - * @return Collection of pending splits. - */ - public Map<GridHadoopInputSplit, Integer> pendingSplits() { - return pendingSplits; - } - - /** - * Sets collection of pending reducers. - * - * @param pendingReducers Collection of pending reducers. - */ - public void pendingReducers(Collection<Integer> pendingReducers) { - this.pendingReducers = pendingReducers; - } - - /** - * Gets collection of pending reducers. - * - * @return Collection of pending reducers. - */ - public Collection<Integer> pendingReducers() { - return pendingReducers; - } - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @param mrPlan Map-reduce plan. - */ - public void mapReducePlan(GridHadoopMapReducePlan mrPlan) { - assert this.mrPlan == null : "Map-reduce plan can only be initialized once."; - - this.mrPlan = mrPlan; - } - - /** - * @return Map-reduce plan. - */ - public GridHadoopMapReducePlan mapReducePlan() { - return mrPlan; - } - - /** - * @return Job info. - */ - public GridHadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * Returns job counters. - * - * @return Collection of counters. - */ - public GridHadoopCounters counters() { - return counters; - } - - /** - * Sets counters. - * - * @param counters Collection of counters. - */ - public void counters(GridHadoopCounters counters) { - this.counters = counters; - } - - /** - * @param failCause Fail cause. - */ - public void failCause(Throwable failCause) { - assert failCause != null; - - if (this.failCause == null) // Keep the first error. - this.failCause = failCause; - } - - /** - * @return Fail cause. - */ - public Throwable failCause() { - return failCause; - } - - /** - * @return Version. - */ - public long version() { - return ver; - } - - /** - * @param split Split. - * @return Task number. - */ - public int taskNumber(GridHadoopInputSplit split) { - return pendingSplits.get(split); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, submitNodeId); - out.writeObject(jobId); - out.writeObject(jobInfo); - out.writeObject(mrPlan); - out.writeObject(pendingSplits); - out.writeObject(pendingReducers); - out.writeObject(phase); - out.writeObject(failCause); - out.writeLong(ver); - out.writeObject(reducersAddrs); - out.writeObject(counters); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - submitNodeId = U.readUuid(in); - jobId = (GridHadoopJobId)in.readObject(); - jobInfo = (GridHadoopJobInfo)in.readObject(); - mrPlan = (GridHadoopMapReducePlan)in.readObject(); - pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject(); - pendingReducers = (Collection<Integer>)in.readObject(); - phase = (GridHadoopJobPhase)in.readObject(); - failCause = (Throwable)in.readObject(); - ver = in.readLong(); - reducersAddrs = (Map<Integer, GridHadoopProcessDescriptor>)in.readObject(); - counters = (GridHadoopCounters)in.readObject(); - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(GridHadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(), - "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null : - failCause.getClass().getName()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java new file mode 100644 index 0000000..6042775 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.jobtracker; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; + +/** + * Hadoop job metadata. Internal object used for distributed job state tracking. + */ +public class HadoopJobMetadata implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + private GridHadoopJobId jobId; + + /** Job info. */ + private GridHadoopJobInfo jobInfo; + + /** Node submitted job. */ + private UUID submitNodeId; + + /** Map-reduce plan. */ + private GridHadoopMapReducePlan mrPlan; + + /** Pending splits for which mapper should be executed. */ + private Map<GridHadoopInputSplit, Integer> pendingSplits; + + /** Pending reducers. */ + private Collection<Integer> pendingReducers; + + /** Reducers addresses. */ + @GridToStringInclude + private Map<Integer, HadoopProcessDescriptor> reducersAddrs; + + /** Job phase. */ + private GridHadoopJobPhase phase = PHASE_SETUP; + + /** Fail cause. */ + @GridToStringExclude + private Throwable failCause; + + /** Version. */ + private long ver; + + /** Job counters */ + private GridHadoopCounters counters = new HadoopCountersImpl(); + + /** + * Empty constructor required by {@link Externalizable}. + */ + public HadoopJobMetadata() { + // No-op. + } + + /** + * Constructor. + * + * @param submitNodeId Submit node ID. + * @param jobId Job ID. + * @param jobInfo Job info. + */ + public HadoopJobMetadata(UUID submitNodeId, GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { + this.jobId = jobId; + this.jobInfo = jobInfo; + this.submitNodeId = submitNodeId; + } + + /** + * Copy constructor. + * + * @param src Metadata to copy. + */ + public HadoopJobMetadata(HadoopJobMetadata src) { + // Make sure to preserve alphabetic order. + counters = src.counters; + failCause = src.failCause; + jobId = src.jobId; + jobInfo = src.jobInfo; + mrPlan = src.mrPlan; + pendingSplits = src.pendingSplits; + pendingReducers = src.pendingReducers; + phase = src.phase; + reducersAddrs = src.reducersAddrs; + submitNodeId = src.submitNodeId; + ver = src.ver + 1; + } + + /** + * @return Submit node ID. + */ + public UUID submitNodeId() { + return submitNodeId; + } + + /** + * @param phase Job phase. + */ + public void phase(GridHadoopJobPhase phase) { + this.phase = phase; + } + + /** + * @return Job phase. + */ + public GridHadoopJobPhase phase() { + return phase; + } + + /** + * Gets reducers addresses for external execution. + * + * @return Reducers addresses. + */ + public Map<Integer, HadoopProcessDescriptor> reducersAddresses() { + return reducersAddrs; + } + + /** + * Sets reducers addresses for external execution. + * + * @param reducersAddrs Map of addresses. + */ + public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) { + this.reducersAddrs = reducersAddrs; + } + + /** + * Sets collection of pending splits. + * + * @param pendingSplits Collection of pending splits. + */ + public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) { + this.pendingSplits = pendingSplits; + } + + /** + * Gets collection of pending splits. + * + * @return Collection of pending splits. + */ + public Map<GridHadoopInputSplit, Integer> pendingSplits() { + return pendingSplits; + } + + /** + * Sets collection of pending reducers. + * + * @param pendingReducers Collection of pending reducers. + */ + public void pendingReducers(Collection<Integer> pendingReducers) { + this.pendingReducers = pendingReducers; + } + + /** + * Gets collection of pending reducers. + * + * @return Collection of pending reducers. + */ + public Collection<Integer> pendingReducers() { + return pendingReducers; + } + + /** + * @return Job ID. + */ + public GridHadoopJobId jobId() { + return jobId; + } + + /** + * @param mrPlan Map-reduce plan. + */ + public void mapReducePlan(GridHadoopMapReducePlan mrPlan) { + assert this.mrPlan == null : "Map-reduce plan can only be initialized once."; + + this.mrPlan = mrPlan; + } + + /** + * @return Map-reduce plan. + */ + public GridHadoopMapReducePlan mapReducePlan() { + return mrPlan; + } + + /** + * @return Job info. + */ + public GridHadoopJobInfo jobInfo() { + return jobInfo; + } + + /** + * Returns job counters. + * + * @return Collection of counters. + */ + public GridHadoopCounters counters() { + return counters; + } + + /** + * Sets counters. + * + * @param counters Collection of counters. + */ + public void counters(GridHadoopCounters counters) { + this.counters = counters; + } + + /** + * @param failCause Fail cause. + */ + public void failCause(Throwable failCause) { + assert failCause != null; + + if (this.failCause == null) // Keep the first error. + this.failCause = failCause; + } + + /** + * @return Fail cause. + */ + public Throwable failCause() { + return failCause; + } + + /** + * @return Version. + */ + public long version() { + return ver; + } + + /** + * @param split Split. + * @return Task number. + */ + public int taskNumber(GridHadoopInputSplit split) { + return pendingSplits.get(split); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, submitNodeId); + out.writeObject(jobId); + out.writeObject(jobInfo); + out.writeObject(mrPlan); + out.writeObject(pendingSplits); + out.writeObject(pendingReducers); + out.writeObject(phase); + out.writeObject(failCause); + out.writeLong(ver); + out.writeObject(reducersAddrs); + out.writeObject(counters); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + submitNodeId = U.readUuid(in); + jobId = (GridHadoopJobId)in.readObject(); + jobInfo = (GridHadoopJobInfo)in.readObject(); + mrPlan = (GridHadoopMapReducePlan)in.readObject(); + pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject(); + pendingReducers = (Collection<Integer>)in.readObject(); + phase = (GridHadoopJobPhase)in.readObject(); + failCause = (Throwable)in.readObject(); + ver = in.readLong(); + reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject(); + counters = (GridHadoopCounters)in.readObject(); + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(), + "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null : + failCause.getClass().getName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 91a2d6f..a0ae3f6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -46,7 +46,7 @@ import java.util.concurrent.atomic.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; -import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*; +import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*; /** * Hadoop job tracker. @@ -56,10 +56,10 @@ public class HadoopJobTracker extends HadoopComponent { private final GridMutex mux = new GridMutex(); /** */ - private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj; + private volatile GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> jobMetaPrj; /** Projection with expiry policy for finished job updates. */ - private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj; + private volatile GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> finishedJobMetaPrj; /** Map-reduce execution planner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") @@ -106,8 +106,8 @@ public class HadoopJobTracker extends HadoopComponent { * @return Job meta projection. */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() { - GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj; + private GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> jobMetaCache() { + GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> prj = jobMetaPrj; if (prj == null) { synchronized (mux) { @@ -128,8 +128,8 @@ public class HadoopJobTracker extends HadoopComponent { throw new IllegalStateException(e); } - jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>) - sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class); + jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata>) + sysCache.projection(GridHadoopJobId.class, HadoopJobMetadata.class); if (ctx.configuration().getFinishedJobInfoTtl() > 0) { ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( @@ -149,8 +149,8 @@ public class HadoopJobTracker extends HadoopComponent { /** * @return Projection with expiry policy for finished job updates. */ - private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() { - GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj; + private GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> finishedJobMetaCache() { + GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj; if (prj == null) { jobMetaCache(); @@ -169,9 +169,9 @@ public class HadoopJobTracker extends HadoopComponent { super.onKernalStart(); jobMetaCache().context().continuousQueries().executeInternalQuery( - new CacheEntryUpdatedListener<GridHadoopJobId, GridHadoopJobMetadata>() { + new CacheEntryUpdatedListener<GridHadoopJobId, HadoopJobMetadata>() { @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends GridHadoopJobId, - ? extends GridHadoopJobMetadata>> evts) { + ? extends HadoopJobMetadata>> evts) { if (!busyLock.tryReadLock()) return; @@ -250,7 +250,7 @@ public class HadoopJobTracker extends HadoopComponent { GridHadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); - GridHadoopJobMetadata meta = new GridHadoopJobMetadata(ctx.localNodeId(), jobId, info); + HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info); meta.mapReducePlan(mrPlan); @@ -268,7 +268,7 @@ public class HadoopJobTracker extends HadoopComponent { long jobStart = U.currentTimeMillis(); - GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(meta.counters(), + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(meta.counters(), ctx.localNodeId()); perfCntr.clientSubmissionEvents(info); @@ -297,7 +297,7 @@ public class HadoopJobTracker extends HadoopComponent { * @return Status. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public static GridHadoopJobStatus status(GridHadoopJobMetadata meta) { + public static GridHadoopJobStatus status(HadoopJobMetadata meta) { GridHadoopJobInfo jobInfo = meta.jobInfo(); return new GridHadoopJobStatus( @@ -325,7 +325,7 @@ public class HadoopJobTracker extends HadoopComponent { return null; // Grid is stopping. try { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + HadoopJobMetadata meta = jobMetaCache().get(jobId); return meta != null ? status(meta) : null; } @@ -346,7 +346,7 @@ public class HadoopJobTracker extends HadoopComponent { return null; // Grid is stopping. try { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + HadoopJobMetadata meta = jobMetaCache().get(jobId); if (meta == null) return null; @@ -400,7 +400,7 @@ public class HadoopJobTracker extends HadoopComponent { return null; try { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + HadoopJobMetadata meta = jobMetaCache().get(jobId); if (meta != null) return meta.mapReducePlan(); @@ -419,7 +419,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. */ @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) - public void onTaskFinished(GridHadoopTaskInfo info, GridHadoopTaskStatus status) { + public void onTaskFinished(GridHadoopTaskInfo info, HadoopTaskStatus status) { if (!busyLock.tryReadLock()) return; @@ -470,7 +470,7 @@ public class HadoopJobTracker extends HadoopComponent { case COMMIT: case ABORT: { - GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache(); + GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache(); cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)). listenAsync(failsLog); @@ -488,7 +488,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param jobId Job id. * @param c Closure of operation. */ - private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void> c) { + private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, HadoopJobMetadata, Void> c) { jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog); } @@ -500,7 +500,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param desc Process descriptor. */ public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection<Integer> reducers, - GridHadoopProcessDescriptor desc) { + HadoopProcessDescriptor desc) { transform(jobId, new InitializeReducersProcessor(null, reducers, desc)); } @@ -557,7 +557,7 @@ public class HadoopJobTracker extends HadoopComponent { // Iteration over all local entries is correct since system cache is REPLICATED. for (Object metaObj : jobMetaCache().values()) { - GridHadoopJobMetadata meta = (GridHadoopJobMetadata)metaObj; + HadoopJobMetadata meta = (HadoopJobMetadata)metaObj; GridHadoopJobId jobId = meta.jobId(); @@ -626,13 +626,13 @@ public class HadoopJobTracker extends HadoopComponent { * @throws IgniteCheckedException If failed. */ private void processJobMetadataUpdates( - Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata>> updated) + Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends HadoopJobMetadata>> updated) throws IgniteCheckedException { UUID locNodeId = ctx.localNodeId(); - for (CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata> entry : updated) { + for (CacheEntryEvent<? extends GridHadoopJobId, ? extends HadoopJobMetadata> entry : updated) { GridHadoopJobId jobId = entry.getKey(); - GridHadoopJobMetadata meta = entry.getValue(); + HadoopJobMetadata meta = entry.getValue(); if (meta == null || !ctx.isParticipating(meta)) continue; @@ -689,7 +689,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param locNodeId Local node ID. * @throws IgniteCheckedException If failed. */ - private void processJobMetaUpdate(GridHadoopJobId jobId, GridHadoopJobMetadata meta, UUID locNodeId) + private void processJobMetaUpdate(GridHadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId) throws IgniteCheckedException { JobLocalState state = activeJobs.get(jobId); @@ -879,7 +879,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param meta Job metadata. * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node. */ - private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, GridHadoopJobMetadata meta) { + private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, HadoopJobMetadata meta) { UUID locNodeId = ctx.localNodeId(); GridHadoopJobId jobId = meta.jobId(); @@ -978,7 +978,7 @@ public class HadoopJobTracker extends HadoopComponent { try { if (jobInfo == null) { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + HadoopJobMetadata meta = jobMetaCache().get(jobId); if (meta == null) throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId); @@ -1024,7 +1024,7 @@ public class HadoopJobTracker extends HadoopComponent { return false; // Grid is stopping. try { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + HadoopJobMetadata meta = jobMetaCache().get(jobId); if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) { HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled."); @@ -1063,7 +1063,7 @@ public class HadoopJobTracker extends HadoopComponent { return null; try { - final GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + final HadoopJobMetadata meta = jobMetaCache().get(jobId); return meta != null ? meta.counters() : null; } @@ -1158,7 +1158,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) { + private void onSetupFinished(final GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { final GridHadoopJobId jobId = taskInfo.jobId(); if (status.state() == FAILED || status.state() == CRASHED) @@ -1172,7 +1172,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onMapFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, + private void onMapFinished(final GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, final StackedProcessor prev) { final GridHadoopJobId jobId = taskInfo.jobId(); @@ -1213,7 +1213,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) { + private void onReduceFinished(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { GridHadoopJobId jobId = taskInfo.jobId(); if (status.state() == FAILED || status.state() == CRASHED) // Fail the whole job. @@ -1227,7 +1227,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onCombineFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, + private void onCombineFinished(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, final StackedProcessor prev) { final GridHadoopJobId jobId = taskInfo.jobId(); @@ -1302,7 +1302,7 @@ public class HadoopJobTracker extends HadoopComponent { } /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { cp.phase(phase); } } @@ -1343,7 +1343,7 @@ public class HadoopJobTracker extends HadoopComponent { } /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits()); for (GridHadoopInputSplit s : splits) @@ -1400,7 +1400,7 @@ public class HadoopJobTracker extends HadoopComponent { } /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers()); rdcCp.remove(rdc); @@ -1425,7 +1425,7 @@ public class HadoopJobTracker extends HadoopComponent { private final Collection<Integer> rdc; /** Process descriptor for reducers. */ - private final GridHadoopProcessDescriptor desc; + private final HadoopProcessDescriptor desc; /** * @param prev Previous closure. @@ -1434,7 +1434,7 @@ public class HadoopJobTracker extends HadoopComponent { */ private InitializeReducersProcessor(@Nullable StackedProcessor prev, Collection<Integer> rdc, - GridHadoopProcessDescriptor desc) { + HadoopProcessDescriptor desc) { super(prev); assert !F.isEmpty(rdc); @@ -1445,11 +1445,11 @@ public class HadoopJobTracker extends HadoopComponent { } /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { - Map<Integer, GridHadoopProcessDescriptor> oldMap = meta.reducersAddresses(); + @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { + Map<Integer, HadoopProcessDescriptor> oldMap = meta.reducersAddresses(); - Map<Integer, GridHadoopProcessDescriptor> rdcMap = oldMap == null ? - new HashMap<Integer, GridHadoopProcessDescriptor>() : new HashMap<>(oldMap); + Map<Integer, HadoopProcessDescriptor> rdcMap = oldMap == null ? + new HashMap<Integer, HadoopProcessDescriptor>() : new HashMap<>(oldMap); for (Integer r : rdc) rdcMap.put(r, desc); @@ -1511,7 +1511,7 @@ public class HadoopJobTracker extends HadoopComponent { } /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { assert meta.phase() == PHASE_CANCELLING || err != null: "Invalid phase for cancel: " + meta; Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers()); @@ -1560,8 +1560,8 @@ public class HadoopJobTracker extends HadoopComponent { } /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { - GridHadoopCounters cntrs = new GridHadoopCountersImpl(cp.counters()); + @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { + GridHadoopCounters cntrs = new HadoopCountersImpl(cp.counters()); cntrs.merge(counters); @@ -1573,7 +1573,7 @@ public class HadoopJobTracker extends HadoopComponent { * Abstract stacked closure. */ private abstract static class StackedProcessor implements - EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void>, Serializable { + EntryProcessor<GridHadoopJobId, HadoopJobMetadata, Void>, Serializable { /** */ private static final long serialVersionUID = 0L; @@ -1588,8 +1588,8 @@ public class HadoopJobTracker extends HadoopComponent { } /** {@inheritDoc} */ - @Override public Void process(MutableEntry<GridHadoopJobId, GridHadoopJobMetadata> e, Object... args) { - GridHadoopJobMetadata val = apply(e.getValue()); + @Override public Void process(MutableEntry<GridHadoopJobId, HadoopJobMetadata> e, Object... args) { + HadoopJobMetadata val = apply(e.getValue()); if (val != null) e.setValue(val); @@ -1603,11 +1603,11 @@ public class HadoopJobTracker extends HadoopComponent { * @param meta Old value. * @return New value. */ - private GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) { + private HadoopJobMetadata apply(HadoopJobMetadata meta) { if (meta == null) return null; - GridHadoopJobMetadata cp = prev != null ? prev.apply(meta) : new GridHadoopJobMetadata(meta); + HadoopJobMetadata cp = prev != null ? prev.apply(meta) : new HadoopJobMetadata(meta); update(meta, cp); @@ -1620,6 +1620,6 @@ public class HadoopJobTracker extends HadoopComponent { * @param meta Initial job metadata. * @param cp Copy. */ - protected abstract void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp); + protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java deleted file mode 100644 index 1670a8a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.message; - -import java.io.*; - -/** - * Marker interface for all hadoop messages. - */ -public interface GridHadoopMessage extends Externalizable { - // No-op. -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java new file mode 100644 index 0000000..cab6138 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.message; + +import java.io.*; + +/** + * Marker interface for all hadoop messages. + */ +public interface HadoopMessage extends Externalizable { + // No-op. +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java deleted file mode 100644 index 7988403..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.planner; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Map-reduce plan. - */ -public class GridHadoopDefaultMapReducePlan implements GridHadoopMapReducePlan { - /** */ - private static final long serialVersionUID = 0L; - - /** Mappers map. */ - private Map<UUID, Collection<GridHadoopInputSplit>> mappers; - - /** Reducers map. */ - private Map<UUID, int[]> reducers; - - /** Mappers count. */ - private int mappersCnt; - - /** Reducers count. */ - private int reducersCnt; - - /** - * @param mappers Mappers map. - * @param reducers Reducers map. - */ - public GridHadoopDefaultMapReducePlan(Map<UUID, Collection<GridHadoopInputSplit>> mappers, - Map<UUID, int[]> reducers) { - this.mappers = mappers; - this.reducers = reducers; - - if (mappers != null) { - for (Collection<GridHadoopInputSplit> splits : mappers.values()) - mappersCnt += splits.size(); - } - - if (reducers != null) { - for (int[] rdcrs : reducers.values()) - reducersCnt += rdcrs.length; - } - } - - /** {@inheritDoc} */ - @Override public int mappers() { - return mappersCnt; - } - - /** {@inheritDoc} */ - @Override public int reducers() { - return reducersCnt; - } - - /** {@inheritDoc} */ - @Override public UUID nodeForReducer(int reducer) { - assert reducer >= 0 && reducer < reducersCnt : reducer; - - for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) { - for (int r : entry.getValue()) { - if (r == reducer) - return entry.getKey(); - } - } - - throw new IllegalStateException("Not found reducer index: " + reducer); - } - - /** {@inheritDoc} */ - @Override @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId) { - return mappers.get(nodeId); - } - - /** {@inheritDoc} */ - @Override @Nullable public int[] reducers(UUID nodeId) { - return reducers.get(nodeId); - } - - /** {@inheritDoc} */ - @Override public Collection<UUID> mapperNodeIds() { - return mappers.keySet(); - } - - /** {@inheritDoc} */ - @Override public Collection<UUID> reducerNodeIds() { - return reducers.keySet(); - } -}