http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6dbf2953/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/v2/IgniteHadoopFileSystem.java new file mode 100644 index 0000000..fecfee5 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -0,0 +1,1008 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite_new.hadoop.fs.v2; + +import org.apache.commons.logging.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.hadoop.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.IgniteFs.*; +import static org.apache.ignite.configuration.IgfsConfiguration.*; +import static org.apache.ignite.igfs.IgfsMode.*; +import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*; +import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; + +/** + * {@code IGFS} Hadoop 2.x file system driver over file system API. To use + * {@code IGFS} as Hadoop file system, you should configure this class + * in Hadoop's {@code core-site.xml} as follows: + * <pre name="code" class="xml"> + * <property> + * <name>fs.default.name</name> + * <value>igfs://ipc</value> + * </property> + * + * <property> + * <name>fs.igfs.impl</name> + * <value>org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystem</value> + * </property> + * </pre> + * You should also add Ignite JAR and all libraries to Hadoop classpath. To + * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop + * distribution: + * <pre name="code" class="bash"> + * export IGNITE_HOME=/path/to/Ignite/distribution + * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar + * + * for f in $IGNITE_HOME/libs/*.jar; do + * export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f; + * done + * </pre> + * <h1 class="header">Data vs Clients Nodes</h1> + * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on + * data nodes. Client nodes are responsible for basic file system operations as well as + * accessing data nodes remotely. Usually, client nodes are started together + * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually + * started together with Hadoop {@code task-tracker} processes. + * <p> + * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml} + * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation. + */ +public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closeable { + /** Logger. */ + private static final Log LOG = LogFactory.getLog(IgniteHadoopFileSystem.class); + + /** Ensures that close routine is invoked at most once. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + /** Grid remote client. */ + private IgfsHadoopWrapper rmtClient; + + /** Working directory. */ + private IgfsPath workingDir; + + /** URI. */ + private URI uri; + + /** Authority. */ + private String uriAuthority; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Server block size. */ + private long grpBlockSize; + + /** Default replication factor. */ + private short dfltReplication; + + /** Secondary URI string. */ + private URI secondaryUri; + + /** Mode resolver. */ + private IgfsModeResolver modeRslvr; + + /** Secondary file system instance. */ + private AbstractFileSystem secondaryFs; + + /** Whether custom sequential reads before prefetch value is provided. */ + private boolean seqReadsBeforePrefetchOverride; + + /** Custom-provided sequential reads before prefetch. */ + private int seqReadsBeforePrefetch; + + /** Flag that controls whether file writes should be colocated on data node. */ + private boolean colocateFileWrites; + + /** Prefer local writes. */ + private boolean preferLocFileWrites; + + /** + * @param name URI for file system. + * @param cfg Configuration. + * @throws java.net.URISyntaxException if name has invalid syntax. + * @throws java.io.IOException If initialization failed. + */ + public IgniteHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException { + super(IgfsHadoopEndpoint.normalize(name), IGFS_SCHEME, false, -1); + + uri = name; + + try { + initialize(name, cfg); + } + catch (IOException e) { + // Close client if exception occurred. + if (rmtClient != null) + rmtClient.close(false); + + throw e; + } + + workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); + } + + /** {@inheritDoc} */ + @Override public void checkPath(Path path) { + URI uri = path.toUri(); + + if (uri.isAbsolute()) { + if (!F.eq(uri.getScheme(), IGFS_SCHEME)) + throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" + + uri.getAuthority() + ']'); + + if (!F.eq(uri.getAuthority(), uriAuthority)) + throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" + + uri.getAuthority() + ']'); + } + } + + /** + * Public setter that can be used by direct users of FS or Visor. + * + * @param colocateFileWrites Whether all ongoing file writes should be colocated. + */ + @SuppressWarnings("UnusedDeclaration") + public void colocateFileWrites(boolean colocateFileWrites) { + this.colocateFileWrites = colocateFileWrites; + } + + /** + * Enter busy state. + * + * @throws java.io.IOException If file system is stopped. + */ + private void enterBusy() throws IOException { + if (closeGuard.get()) + throw new IOException("File system is stopped."); + } + + /** + * Leave busy state. + */ + private void leaveBusy() { + // No-op. + } + + /** + * @param name URI passed to constructor. + * @param cfg Configuration passed to constructor. + * @throws java.io.IOException If initialization failed. + */ + private void initialize(URI name, Configuration cfg) throws IOException { + enterBusy(); + + try { + if (rmtClient != null) + throw new IOException("File system is already initialized: " + rmtClient); + + A.notNull(name, "name"); + A.notNull(cfg, "cfg"); + + if (!IGFS_SCHEME.equals(name.getScheme())) + throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME + + "://[name]/[optional_path], actual=" + name + ']'); + + uriAuthority = name.getAuthority(); + + // Override sequential reads before prefetch if needed. + seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); + + if (seqReadsBeforePrefetch > 0) + seqReadsBeforePrefetchOverride = true; + + // In Ignite replication factor is controlled by data cache affinity. + // We use replication factor to force the whole file to be stored on local node. + dfltReplication = (short)cfg.getInt("dfs.replication", 3); + + // Get file colocation control flag. + colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false); + preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false); + + // Get log directory. + String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR); + + File logDirFile = U.resolveIgnitePath(logDirCfg); + + String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; + + rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG); + + // Handshake. + IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); + + grpBlockSize = handshake.blockSize(); + + IgfsPaths paths = handshake.secondaryPaths(); + + Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); + + if (handshake.sampling() != null ? handshake.sampling() : logEnabled) { + // Initiate client logger. + if (logDir == null) + throw new IOException("Failed to resolve log directory: " + logDirCfg); + + Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE); + + clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize); + } + else + clientLog = IgfsLogger.disabledLogger(); + + modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); + + boolean initSecondary = paths.defaultMode() == PROXY; + + if (paths.pathModes() != null) { + for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) { + IgfsMode mode = pathMode.getValue(); + + initSecondary |= mode == PROXY; + } + } + + if (initSecondary) { + Map<String, String> props = paths.properties(); + + String secUri = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_URI); + String secConfPath = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_CONFIG_PATH); + + if (secConfPath == null) + throw new IOException("Failed to connect to the secondary file system because configuration " + + "path is not provided."); + + if (secUri == null) + throw new IOException("Failed to connect to the secondary file system because URI is not " + + "provided."); + + if (secConfPath == null) + throw new IOException("Failed to connect to the secondary file system because configuration " + + "path is not provided."); + + if (secUri == null) + throw new IOException("Failed to connect to the secondary file system because URI is not " + + "provided."); + + try { + secondaryUri = new URI(secUri); + + URL secondaryCfgUrl = U.resolveIgniteUrl(secConfPath); + + if (secondaryCfgUrl == null) + throw new IOException("Failed to resolve secondary file system config URL: " + secConfPath); + + Configuration conf = new Configuration(); + + conf.addResource(secondaryCfgUrl); + + String prop = String.format("fs.%s.impl.disable.cache", secondaryUri.getScheme()); + + conf.setBoolean(prop, true); + + secondaryFs = AbstractFileSystem.get(secondaryUri, conf); + } + catch (URISyntaxException ignore) { + throw new IOException("Failed to resolve secondary file system URI: " + secUri); + } + catch (IOException e) { + throw new IOException("Failed to connect to the secondary file system: " + secUri, e); + } + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (closeGuard.compareAndSet(false, true)) { + if (rmtClient == null) + return; + + rmtClient.close(false); + + if (clientLog.isLogEnabled()) + clientLog.close(); + + // Reset initialized resources. + rmtClient = null; + } + } + + /** {@inheritDoc} */ + @Override public URI getUri() { + return uri; + } + + /** {@inheritDoc} */ + @Override public int getUriDefaultPort() { + return -1; + } + + /** {@inheritDoc} */ + @Override public FsServerDefaults getServerDefaults() throws IOException { + return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024, + false, 0, DataChecksum.Type.NULL); + } + + /** {@inheritDoc} */ + @Override public boolean setReplication(Path f, short replication) throws IOException { + return mode(f) == PROXY && secondaryFs.setReplication(f, replication); + } + + /** {@inheritDoc} */ + @Override public void setTimes(Path f, long mtime, long atime) throws IOException { + if (mode(f) == PROXY) + secondaryFs.setTimes(f, mtime, atime); + else { + if (mtime == -1 && atime == -1) + return; + + rmtClient.setTimes(convert(f), atime, mtime); + } + } + + /** {@inheritDoc} */ + @Override public FsStatus getFsStatus() throws IOException { + IgfsStatus status = rmtClient.fsStatus(); + + return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed()); + } + + /** {@inheritDoc} */ + @Override public void setPermission(Path p, FsPermission perm) throws IOException { + enterBusy(); + + try { + A.notNull(p, "p"); + + if (mode(p) == PROXY) + secondaryFs.setPermission(toSecondary(p), perm); + else { + if (rmtClient.update(convert(p), permission(perm)) == null) + throw new IOException("Failed to set file permission (file not found?)" + + " [path=" + p + ", perm=" + perm + ']'); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void setOwner(Path p, String usr, String grp) throws IOException { + A.notNull(p, "p"); + A.notNull(usr, "username"); + A.notNull(grp, "grpName"); + + enterBusy(); + + try { + if (mode(p) == PROXY) + secondaryFs.setOwner(toSecondary(p), usr, grp); + else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null) + throw new IOException("Failed to set file permission (file not found?)" + + " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']'); + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public FSDataInputStream open(Path f, int bufSize) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) { + FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize); + + if (clientLog.isLogEnabled()) { + // At this point we do not know file size, so we perform additional request to remote FS to get it. + FileStatus status = secondaryFs.getFileStatus(toSecondary(f)); + + long size = status != null ? status.getLen() : -1; + + long logId = IgfsLogger.nextId(); + + clientLog.logOpen(logId, path, PROXY, bufSize, size); + + return new FSDataInputStream(new IgfsHadoopProxyInputStream(is, clientLog, logId)); + } + else + return is; + } + else { + IgfsHadoopStreamDelegate stream = seqReadsBeforePrefetchOverride ? + rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); + + long logId = -1; + + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); + + clientLog.logOpen(logId, path, mode, bufSize, stream.length()); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + + ", bufSize=" + bufSize + ']'); + + IgfsHadoopInputStream igfsIn = new IgfsHadoopInputStream(stream, stream.length(), + bufSize, LOG, clientLog, logId); + + if (LOG.isDebugEnabled()) + LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); + + return new FSDataInputStream(igfsIn); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public FSDataOutputStream createInternal( + Path f, + EnumSet<CreateFlag> flag, + FsPermission perm, + int bufSize, + short replication, + long blockSize, + Progressable progress, + Options.ChecksumOpt checksumOpt, + boolean createParent + ) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + boolean overwrite = flag.contains(CreateFlag.OVERWRITE); + boolean append = flag.contains(CreateFlag.APPEND); + boolean create = flag.contains(CreateFlag.CREATE); + + OutputStream out = null; + + try { + IgfsPath path = convert(f); + IgfsMode mode = modeRslvr.resolveMode(path); + + if (LOG.isDebugEnabled()) + LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" + + path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); + + if (mode == PROXY) { + FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag, perm, bufSize, + replication, blockSize, progress, checksumOpt, createParent); + + if (clientLog.isLogEnabled()) { + long logId = IgfsLogger.nextId(); + + if (append) + clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. + else + clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); + + return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId)); + } + else + return os; + } + else { + Map<String, String> permMap = F.asMap(PROP_PERMISSION, toString(perm), + PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); + + // Create stream and close it in the 'finally' section if any sequential operation failed. + IgfsHadoopStreamDelegate stream; + + long logId = -1; + + if (append) { + stream = rmtClient.append(path, create, permMap); + + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); + + clientLog.logAppend(logId, path, mode, bufSize); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); + } + else { + stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize, + permMap); + + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); + + clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); + } + + assert stream != null; + + IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, + clientLog, logId); + + bufSize = Math.max(64 * 1024, bufSize); + + out = new BufferedOutputStream(igfsOut, bufSize); + + FSDataOutputStream res = new FSDataOutputStream(out, null, 0); + + // Mark stream created successfully. + out = null; + + return res; + } + } + finally { + // Close if failed during stream creation. + if (out != null) + U.closeQuiet(out); + + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean supportsSymlinks() { + return false; + } + + /** {@inheritDoc} */ + @Override public void renameInternal(Path src, Path dst) throws IOException { + A.notNull(src, "src"); + A.notNull(dst, "dst"); + + enterBusy(); + + try { + IgfsPath srcPath = convert(src); + IgfsPath dstPath = convert(dst); + Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(srcPath); + + if (childrenModes.contains(PROXY)) { + if (clientLog.isLogEnabled()) + clientLog.logRename(srcPath, PROXY, dstPath); + + secondaryFs.renameInternal(toSecondary(src), toSecondary(dst)); + } + + rmtClient.rename(srcPath, dstPath); + + if (clientLog.isLogEnabled()) + clientLog.logRename(srcPath, modeRslvr.resolveMode(srcPath), dstPath); + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean delete(Path f, boolean recursive) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = modeRslvr.resolveMode(path); + Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(path); + + if (childrenModes.contains(PROXY)) { + if (clientLog.isLogEnabled()) + clientLog.logDelete(path, PROXY, recursive); + + return secondaryFs.delete(toSecondary(f), recursive); + } + + boolean res = rmtClient.delete(path, recursive); + + if (clientLog.isLogEnabled()) + clientLog.logDelete(path, mode, recursive); + + return res; + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException { + // Checksum has effect for secondary FS only. + if (secondaryFs != null) + secondaryFs.setVerifyChecksum(verifyChecksum); + } + + /** {@inheritDoc} */ + @Override public FileChecksum getFileChecksum(Path f) throws IOException { + if (mode(f) == PROXY) + return secondaryFs.getFileChecksum(f); + + return null; + } + + /** {@inheritDoc} */ + @Override public FileStatus[] listStatus(Path f) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) { + FileStatus[] arr = secondaryFs.listStatus(toSecondary(f)); + + if (arr == null) + throw new FileNotFoundException("File " + f + " does not exist."); + + for (int i = 0; i < arr.length; i++) + arr[i] = toPrimary(arr[i]); + + if (clientLog.isLogEnabled()) { + String[] fileArr = new String[arr.length]; + + for (int i = 0; i < arr.length; i++) + fileArr[i] = arr[i].getPath().toString(); + + clientLog.logListDirectory(path, PROXY, fileArr); + } + + return arr; + } + else { + Collection<IgfsFile> list = rmtClient.listFiles(path); + + if (list == null) + throw new FileNotFoundException("File " + f + " does not exist."); + + List<IgfsFile> files = new ArrayList<>(list); + + FileStatus[] arr = new FileStatus[files.size()]; + + for (int i = 0; i < arr.length; i++) + arr[i] = convert(files.get(i)); + + if (clientLog.isLogEnabled()) { + String[] fileArr = new String[arr.length]; + + for (int i = 0; i < arr.length; i++) + fileArr[i] = arr[i].getPath().toString(); + + clientLog.logListDirectory(path, mode, fileArr); + } + + return arr; + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) { + if (clientLog.isLogEnabled()) + clientLog.logMakeDirectory(path, PROXY); + + secondaryFs.mkdir(toSecondary(f), perm, createParent); + } + else { + rmtClient.mkdirs(path, permission(perm)); + + if (clientLog.isLogEnabled()) + clientLog.logMakeDirectory(path, mode); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileStatus(Path f) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + if (mode(f) == PROXY) + return toPrimary(secondaryFs.getFileStatus(toSecondary(f))); + else { + IgfsFile info = rmtClient.info(convert(f)); + + if (info == null) + throw new FileNotFoundException("File not found: " + f); + + return convert(info); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException { + A.notNull(path, "path"); + + IgfsPath igfsPath = convert(path); + + enterBusy(); + + try { + if (modeRslvr.resolveMode(igfsPath) == PROXY) + return secondaryFs.getFileBlockLocations(path, start, len); + else { + long now = System.currentTimeMillis(); + + List<IgfsBlockLocation> affinity = new ArrayList<>( + rmtClient.affinity(igfsPath, start, len)); + + BlockLocation[] arr = new BlockLocation[affinity.size()]; + + for (int i = 0; i < arr.length; i++) + arr[i] = convert(affinity.get(i)); + + if (LOG.isDebugEnabled()) + LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + + (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); + + return arr; + } + } + finally { + leaveBusy(); + } + } + + /** + * Resolve path mode. + * + * @param path HDFS path. + * @return Path mode. + */ + public IgfsMode mode(Path path) { + return modeRslvr.resolveMode(convert(path)); + } + + /** + * Convert the given path to path acceptable by the primary file system. + * + * @param path Path. + * @return Primary file system path. + */ + private Path toPrimary(Path path) { + return convertPath(path, getUri()); + } + + /** + * Convert the given path to path acceptable by the secondary file system. + * + * @param path Path. + * @return Secondary file system path. + */ + private Path toSecondary(Path path) { + assert secondaryFs != null; + assert secondaryUri != null; + + return convertPath(path, secondaryUri); + } + + /** + * Convert path using the given new URI. + * + * @param path Old path. + * @param newUri New URI. + * @return New path. + */ + private Path convertPath(Path path, URI newUri) { + assert newUri != null; + + if (path != null) { + URI pathUri = path.toUri(); + + try { + return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null, + pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null)); + } + catch (URISyntaxException e) { + throw new IgniteException("Failed to construct secondary file system path from the primary file " + + "system path: " + path, e); + } + } + else + return null; + } + + /** + * Convert a file status obtained from the secondary file system to a status of the primary file system. + * + * @param status Secondary file system status. + * @return Primary file system status. + */ + private FileStatus toPrimary(FileStatus status) { + return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(), + status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), + status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null; + } + + /** + * Convert IGFS path into Hadoop path. + * + * @param path IGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + return new Path(IGFS_SCHEME, uriAuthority, path.toString()); + } + + /** + * Convert Hadoop path into IGFS path. + * + * @param path Hadoop path. + * @return IGFS path. + */ + @Nullable private IgfsPath convert(Path path) { + if (path == null) + return null; + + return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : + new IgfsPath(workingDir, path.toUri().getPath()); + } + + /** + * Convert IGFS affinity block location into Hadoop affinity block location. + * + * @param block IGFS affinity block location. + * @return Hadoop affinity block location. + */ + private BlockLocation convert(IgfsBlockLocation block) { + Collection<String> names = block.names(); + Collection<String> hosts = block.hosts(); + + return new BlockLocation( + names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */, + hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */, + block.start(), block.length() + ) { + @Override public String toString() { + try { + return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() + + ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']'; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + + /** + * Convert IGFS file information into Hadoop file status. + * + * @param file IGFS file information. + * @return Hadoop file status. + */ + private FileStatus convert(IgfsFile file) { + return new FileStatus( + file.length(), + file.isDirectory(), + dfltReplication, + file.groupBlockSize(), + file.modificationTime(), + file.accessTime(), + permission(file), + file.property(PROP_USER_NAME, DFLT_USER_NAME), + file.property(PROP_GROUP_NAME, "users"), + convert(file.path())) { + @Override public String toString() { + return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]"; + } + }; + } + + /** + * Convert Hadoop permission into IGFS file attribute. + * + * @param perm Hadoop permission. + * @return IGFS attributes. + */ + private Map<String, String> permission(FsPermission perm) { + if (perm == null) + perm = FsPermission.getDefault(); + + return F.asMap(PROP_PERMISSION, toString(perm)); + } + + /** + * @param perm Permission. + * @return String. + */ + private static String toString(FsPermission perm) { + return String.format("%04o", perm.toShort()); + } + + /** + * Convert IGFS file attributes into Hadoop permission. + * + * @param file File info. + * @return Hadoop permission. + */ + private FsPermission permission(IgfsFile file) { + String perm = file.property(PROP_PERMISSION, null); + + if (perm == null) + return FsPermission.getDefault(); + + try { + return new FsPermission((short)Integer.parseInt(perm, 8)); + } + catch (NumberFormatException ignore) { + return FsPermission.getDefault(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteHadoopFileSystem.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6dbf2953/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounterGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounterGroup.java new file mode 100644 index 0000000..0507787 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounterGroup.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite_new.hadoop.mapreduce; + +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.counters.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop Client API Counters adapter. + */ +class IgniteHadoopCounterGroup implements CounterGroup { + /** Counters. */ + private final IgniteHadoopCounters cntrs; + + /** Group name. */ + private final String name; + + /** + * Creates new instance. + * + * @param cntrs Client counters instance. + * @param name Group name. + */ + IgniteHadoopCounterGroup(IgniteHadoopCounters cntrs, String name) { + this.cntrs = cntrs; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String getName() { + return name; + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return name; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void addCounter(Counter counter) { + addCounter(counter.getName(), counter.getDisplayName(), 0); + } + + /** {@inheritDoc} */ + @Override public Counter addCounter(String name, String displayName, long value) { + final Counter counter = cntrs.findCounter(this.name, name); + + counter.setValue(value); + + return counter; + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String counterName, String displayName) { + return cntrs.findCounter(name, counterName); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String counterName, boolean create) { + return cntrs.findCounter(name, counterName, create); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String counterName) { + return cntrs.findCounter(name, counterName); + } + + /** {@inheritDoc} */ + @Override public int size() { + return cntrs.groupSize(name); + } + + /** {@inheritDoc} */ + @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) { + for (final Counter counter : rightGroup) + cntrs.findCounter(name, counter.getName()).increment(counter.getValue()); + } + + /** {@inheritDoc} */ + @Override public CounterGroupBase<Counter> getUnderlyingGroup() { + return this; + } + + /** {@inheritDoc} */ + @Override public Iterator<Counter> iterator() { + return cntrs.iterateGroup(name); + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6dbf2953/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounters.java new file mode 100644 index 0000000..dd6b2ed --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounters.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite_new.hadoop.mapreduce; + +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.counters.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop Client API Counters adapter. + */ +public class IgniteHadoopCounters extends Counters { + /** */ + private final Map<T2<String,String>,GridHadoopLongCounter> cntrs = new HashMap<>(); + + /** + * Creates new instance based on given counters. + * + * @param cntrs Counters to adapt. + */ + public IgniteHadoopCounters(GridHadoopCounters cntrs) { + for (GridHadoopCounter cntr : cntrs.all()) + if (cntr instanceof GridHadoopLongCounter) + this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (GridHadoopLongCounter) cntr); + } + + /** {@inheritDoc} */ + @Override public synchronized CounterGroup addGroup(CounterGroup grp) { + return addGroup(grp.getName(), grp.getDisplayName()); + } + + /** {@inheritDoc} */ + @Override public CounterGroup addGroup(String name, String displayName) { + return new IgniteHadoopCounterGroup(this, name); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String grpName, String cntrName) { + return findCounter(grpName, cntrName, true); + } + + /** {@inheritDoc} */ + @Override public synchronized Counter findCounter(Enum<?> key) { + return findCounter(key.getDeclaringClass().getName(), key.name(), true); + } + + /** {@inheritDoc} */ + @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) { + return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name()); + } + + /** {@inheritDoc} */ + @Override public synchronized Iterable<String> getGroupNames() { + Collection<String> res = new HashSet<>(); + + for (GridHadoopCounter counter : cntrs.values()) + res.add(counter.group()); + + return res; + } + + /** {@inheritDoc} */ + @Override public Iterator<CounterGroup> iterator() { + final Iterator<String> iter = getGroupNames().iterator(); + + return new Iterator<CounterGroup>() { + @Override public boolean hasNext() { + return iter.hasNext(); + } + + @Override public CounterGroup next() { + if (!hasNext()) + throw new NoSuchElementException(); + + return new IgniteHadoopCounterGroup(IgniteHadoopCounters.this, iter.next()); + } + + @Override public void remove() { + throw new UnsupportedOperationException("not implemented"); + } + }; + } + + /** {@inheritDoc} */ + @Override public synchronized CounterGroup getGroup(String grpName) { + return new IgniteHadoopCounterGroup(this, grpName); + } + + /** {@inheritDoc} */ + @Override public synchronized int countCounters() { + return cntrs.size(); + } + + /** {@inheritDoc} */ + @Override public synchronized void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) { + for (CounterGroup group : other) { + for (Counter counter : group) { + findCounter(group.getName(), counter.getName()).increment(counter.getValue()); + } + } + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object genericRight) { + if (!(genericRight instanceof IgniteHadoopCounters)) + return false; + + return cntrs.equals(((IgniteHadoopCounters) genericRight).cntrs); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return cntrs.hashCode(); + } + + /** {@inheritDoc} */ + @Override public void setWriteAllCounters(boolean snd) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean getWriteAllCounters() { + return true; + } + + /** {@inheritDoc} */ + @Override public Limits limits() { + return null; + } + + /** + * Returns size of a group. + * + * @param grpName Name of the group. + * @return amount of counters in the given group. + */ + public int groupSize(String grpName) { + int res = 0; + + for (GridHadoopCounter counter : cntrs.values()) { + if (grpName.equals(counter.group())) + res++; + } + + return res; + } + + /** + * Returns counters iterator for specified group. + * + * @param grpName Name of the group to iterate. + * @return Counters iterator. + */ + public Iterator<Counter> iterateGroup(String grpName) { + Collection<Counter> grpCounters = new ArrayList<>(); + + for (GridHadoopLongCounter counter : cntrs.values()) { + if (grpName.equals(counter.group())) + grpCounters.add(new GridHadoopV2Counter(counter)); + } + + return grpCounters.iterator(); + } + + /** + * Find a counter in the group. + * + * @param grpName The name of the counter group. + * @param cntrName The name of the counter. + * @param create Create the counter if not found if true. + * @return The counter that was found or added or {@code null} if create is false. + */ + public Counter findCounter(String grpName, String cntrName, boolean create) { + T2<String, String> key = new T2<>(grpName, cntrName); + + GridHadoopLongCounter internalCntr = cntrs.get(key); + + if (internalCntr == null & create) { + internalCntr = new GridHadoopLongCounter(grpName,cntrName); + + cntrs.put(key, new GridHadoopLongCounter(grpName,cntrName)); + } + + return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6dbf2953/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocol.java new file mode 100644 index 0000000..7244da4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocol.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite_new.hadoop.mapreduce.protocol; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.protocol.*; +import org.apache.hadoop.mapreduce.security.token.delegation.*; +import org.apache.hadoop.mapreduce.v2.*; +import org.apache.hadoop.mapreduce.v2.jobhistory.*; +import org.apache.hadoop.security.*; +import org.apache.hadoop.security.authorize.*; +import org.apache.hadoop.security.token.*; +import org.apache.ignite.*; +import org.apache.ignite.client.hadoop.counter.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.proto.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; + +/** + * Hadoop client protocol. + */ +public class IgniteHadoopClientProtocol implements ClientProtocol { + /** Ignite framework name property. */ + public static final String FRAMEWORK_NAME = "ignite"; + + /** Protocol version. */ + private static final long PROTO_VER = 1L; + + /** Default Ignite system directory. */ + private static final String SYS_DIR = ".ignite/system"; + + /** Configuration. */ + private final Configuration conf; + + /** Ignite client. */ + private volatile GridClient cli; + + /** Last received version. */ + private long lastVer = -1; + + /** Last received status. */ + private GridHadoopJobStatus lastStatus; + + /** + * Constructor. + * + * @param conf Configuration. + * @param cli Ignite client. + */ + IgniteHadoopClientProtocol(Configuration conf, GridClient cli) { + assert cli != null; + + this.conf = conf; + this.cli = cli; + } + + /** {@inheritDoc} */ + @Override public JobID getNewJobID() throws IOException, InterruptedException { + try { + conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); + + GridHadoopJobId jobID = cli.compute().execute(GridHadoopProtocolNextTaskIdTask.class.getName(), null); + + conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); + + return new JobID(jobID.globalId().toString(), jobID.localId()); + } + catch (GridClientException e) { + throw new IOException("Failed to get new job ID.", e); + } + } + + /** {@inheritDoc} */ + @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, + InterruptedException { + try { + conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); + + GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolSubmitJobTask.class.getName(), + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); + + if (status == null) + throw new IOException("Failed to submit job (null status obtained): " + jobId); + + return processStatus(status); + } + catch (GridClientException | IgniteCheckedException e) { + throw new IOException("Failed to submit job.", e); + } + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { + return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0); + } + + /** {@inheritDoc} */ + @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException { + return Cluster.JobTrackerStatus.RUNNING; + } + + /** {@inheritDoc} */ + @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException { + return 0; + } + + /** {@inheritDoc} */ + @Override public AccessControlList getQueueAdmins(String queueName) throws IOException { + return new AccessControlList("*"); + } + + /** {@inheritDoc} */ + @Override public void killJob(JobID jobId) throws IOException, InterruptedException { + try { + cli.compute().execute(GridHadoopProtocolKillJobTask.class.getName(), + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + } + catch (GridClientException e) { + throw new IOException("Failed to kill job: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, + InterruptedException { + return false; + } + + /** {@inheritDoc} */ + @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException { + try { + Long delay = conf.getLong(GridHadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); + + GridHadoopProtocolTaskArguments args = delay >= 0 ? + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); + + GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolJobStatusTask.class.getName(), args); + + if (status == null) + throw new IOException("Job tracker doesn't have any information about the job: " + jobId); + + return processStatus(status); + } + catch (GridClientException e) { + throw new IOException("Failed to get job status: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { + try { + final GridHadoopCounters counters = cli.compute().execute(GridHadoopProtocolJobCountersTask.class.getName(), + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + + if (counters == null) + throw new IOException("Job tracker doesn't have any information about the job: " + jobId); + + return new GridHadoopClientCounters(counters); + } + catch (GridClientException e) { + throw new IOException("Failed to get job counters: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException { + return new TaskReport[0]; + } + + /** {@inheritDoc} */ + @Override public String getFilesystemName() throws IOException, InterruptedException { + return FileSystem.get(conf).getUri().toString(); + } + + /** {@inheritDoc} */ + @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException { + return new JobStatus[0]; + } + + /** {@inheritDoc} */ + @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents) + throws IOException, InterruptedException { + return new TaskCompletionEvent[0]; + } + + /** {@inheritDoc} */ + @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException { + return new String[0]; + } + + /** {@inheritDoc} */ + @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { + return new TaskTrackerInfo[0]; + } + + /** {@inheritDoc} */ + @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { + return new TaskTrackerInfo[0]; + } + + /** {@inheritDoc} */ + @Override public String getSystemDir() throws IOException, InterruptedException { + Path sysDir = new Path(SYS_DIR); + + return sysDir.toString(); + } + + /** {@inheritDoc} */ + @Override public String getStagingAreaDir() throws IOException, InterruptedException { + String usr = UserGroupInformation.getCurrentUser().getShortUserName(); + + return GridHadoopUtils.stagingAreaDir(conf, usr).toString(); + } + + /** {@inheritDoc} */ + @Override public String getJobHistoryDir() throws IOException, InterruptedException { + return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getQueues() throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { + return new QueueAclsInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, + InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, + InterruptedException { + return 0; + } + + /** {@inheritDoc} */ + @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, + InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException, + InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return PROTO_VER; + } + + /** {@inheritDoc} */ + @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) + throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); + } + + /** + * Process received status update. + * + * @param status Ignite status. + * @return Hadoop status. + */ + private JobStatus processStatus(GridHadoopJobStatus status) { + // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because + // GridHadoopClientProtocolProvider creates new instance of this class for every new job and Job class + // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will + // change in future and either protocol will serve statuses for several jobs or status update will not be + // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap). + // (vozerov) + if (lastVer < status.version()) { + lastVer = status.version(); + + lastStatus = status; + } + else + assert lastStatus != null; + + return GridHadoopUtils.status(lastStatus, conf); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6dbf2953/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocolProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocolProvider.java new file mode 100644 index 0000000..d2fe28e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocolProvider.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite_new.hadoop.mapreduce.protocol; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.protocol.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.marshaller.optimized.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.client.hadoop.GridHadoopClientProtocol.*; +import static org.apache.ignite.internal.client.GridClientProtocol.*; + + +/** + * Grid Hadoop client protocol provider. + */ +public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { + /** Clients. */ + private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public ClientProtocol create(Configuration conf) throws IOException { + if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { + String addr = conf.get(MRConfig.MASTER_ADDRESS); + + if (F.isEmpty(addr)) + throw new IOException("Failed to create client protocol because server address is not specified (is " + + MRConfig.MASTER_ADDRESS + " property set?)."); + + if (F.eq(addr, "local")) + throw new IOException("Local execution mode is not supported, please point " + + MRConfig.MASTER_ADDRESS + " to real Ignite node."); + + return createProtocol(addr, conf); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { + if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) + return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf); + + return null; + } + + /** {@inheritDoc} */ + @Override public void close(ClientProtocol cliProto) throws IOException { + // No-op. + } + + /** + * Internal protocol creation routine. + * + * @param addr Address. + * @param conf Configuration. + * @return Client protocol. + * @throws java.io.IOException If failed. + */ + private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException { + return new IgniteHadoopClientProtocol(conf, client(addr)); + } + + /** + * Create client. + * + * @param addr Endpoint address. + * @return Client. + * @throws java.io.IOException If failed. + */ + private static GridClient client(String addr) throws IOException { + try { + IgniteInternalFuture<GridClient> fut = cliMap.get(addr); + + if (fut == null) { + GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>(); + + IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0); + + if (oldFut != null) + return oldFut.get(); + else { + GridClientConfiguration cliCfg = new GridClientConfiguration(); + + cliCfg.setProtocol(TCP); + cliCfg.setServers(Collections.singletonList(addr)); + cliCfg.setMarshaller(new GridClientOptimizedMarshaller()); + cliCfg.setDaemon(true); + + try { + GridClient cli = GridClientFactory.start(cliCfg); + + fut0.onDone(cli); + + return cli; + } + catch (GridClientException e) { + fut0.onDone(e); + + throw new IOException("Failed to establish connection with Ignite node: " + addr, e); + } + } + } + else + return fut.get(); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to establish connection with Ignite node: " + addr, e); + } + } +}