http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index d0a327e..2e855d0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. synchronized (HadoopDefaultJobInfo.class) { if ((jobCls0 = jobCls) == null) { - HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main"); + HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job"); jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName()); }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 00be422..68a9ef6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -26,10 +26,16 @@ import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.v1.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import java.io.*; +import java.net.*; import java.util.*; /** @@ -57,6 +63,41 @@ public class HadoopUtils { /** Old reducer class attribute. */ private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); + + String prop = HadoopUtils.disableFsCachePropertyName(scheme); + + cfg.setBoolean(prop, true); + + return FileSystem.get(uri, cfg, key.user()); + } + catch (IOException | InterruptedException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + + /** + * Constructor. + */ + private HadoopUtils() { + // No-op. + } + /** * Wraps native split. * @@ -126,11 +167,13 @@ public class HadoopUtils { break; case PHASE_REDUCE: - assert status.totalReducerCnt() > 0; - setupProgress = 1; mapProgress = 1; - reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + + if (status.totalReducerCnt() > 0) + reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + else + reduceProgress = 1f; break; @@ -300,9 +343,242 @@ public class HadoopUtils { } /** - * Constructor. + * Creates {@link Configuration} in a correct class loader context to avoid caching + * of inappropriate class loader in the Configuration object. + * @return New instance of {@link Configuration}. */ - private HadoopUtils() { - // No-op. + public static Configuration safeCreateConfiguration() { + final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader()); + + try { + return new Configuration(); + } + finally { + Thread.currentThread().setContextClassLoader(cl0); + } + } + + /** + * Creates {@link JobConf} in a correct class loader context to avoid caching + * of inappropriate class loader in the Configuration object. + * @return New instance of {@link JobConf}. + */ + public static JobConf safeCreateJobConf() { + final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader()); + + try { + return new JobConf(); + } + finally { + Thread.currentThread().setContextClassLoader(cl0); + } + } + + /** + * Gets non-null user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + return user; + } + + /** + * Common method to get the V1 file system in MapRed engine. + * It creates the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * @param uri the file system uri. + * @param cfg the configuration. + * @return the file system + * @throws IOException + */ + public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException { + final String usr = getMrHadoopUser(cfg); + + assert usr != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + + if (doCacheFs) { + try { + fs = getWithCaching(uri, cfg, usr); + } + catch (IgniteException ie) { + throw new IOException(ie); + } + } + else { + try { + fs = FileSystem.get(uri, cfg, usr); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + + throw new IOException(ie); + } + } + + assert fs != null; + assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); + + return fs; + } + + /** + * Note that configuration is not a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + * Constructor + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + * Creates String key used for equality and hashing. + */ + private String createEqualityKey() { + GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); + + if (uri.getScheme() != null) + sb.a(uri.getScheme().toLowerCase()); + + sb.a("://"); + + if (uri.getAuthority() != null) + sb.a(uri.getAuthority().toLowerCase()); + + return sb.toString(); + } + + /** + * The URI. + */ + public URI uri() { + return uri; + } + + /** + * The User. + */ + public String user() { + return usr; + } + + /** + * The Configuration. + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } + + /** + * Gets FileSystem caching it in static Ignite cache. The cache is a singleton + * for each class loader. + * + * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}. + * The Configuration is not a part of the key. This means that for the given key file system is + * initialized only once with the Configuration passed in upon the file system creation. + * + * @param uri The file system URI. + * @param cfg The configuration. + * @param usr The user to create file system for. + * @return The file system: either created, or taken from the cache. + */ + private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) { + FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + return fileSysLazyMap.getOrCreate(key); + } + + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + public static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index 27805f8..dd679de 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -19,26 +19,26 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; +import org.apache.hadoop.security.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; import java.net.*; +import java.security.*; /** * Encapsulates logic of secondary filesystem creation. */ public class SecondaryFileSystemProvider { /** Configuration of the secondary filesystem, never null. */ - private final Configuration cfg = new Configuration(); + private final Configuration cfg = HadoopUtils.safeCreateConfiguration(); /** The secondary filesystem URI, never null. */ private final URI uri; - /** Optional user name to log into secondary filesystem with. */ - private @Nullable final String userName; - /** * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be * specified either explicitly or in the configuration provided. @@ -47,13 +47,10 @@ public class SecondaryFileSystemProvider { * property in the provided configuration. * @param secConfPath the secondary Fs path (file path on the local file system, optional). * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved. - * @param userName User name. * @throws IOException */ public SecondaryFileSystemProvider(final @Nullable String secUri, - final @Nullable String secConfPath, @Nullable String userName) throws IOException { - this.userName = userName; - + final @Nullable String secConfPath) throws IOException { if (secConfPath != null) { URL url = U.resolveIgniteUrl(secConfPath); @@ -79,7 +76,7 @@ public class SecondaryFileSystemProvider { } // Disable caching: - String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme()); + String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme()); cfg.setBoolean(prop, true); } @@ -88,20 +85,18 @@ public class SecondaryFileSystemProvider { * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. * @throws IOException */ - public FileSystem createFileSystem() throws IOException { + public FileSystem createFileSystem(String userName) throws IOException { + userName = IgfsUtils.fixUserName(userName); + final FileSystem fileSys; - if (userName == null) - fileSys = FileSystem.get(uri, cfg); - else { - try { - fileSys = FileSystem.get(uri, cfg, userName); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + try { + fileSys = FileSystem.get(uri, cfg, userName); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); - throw new IOException("Failed to create file system due to interrupt.", e); - } + throw new IOException("Failed to create file system due to interrupt.", e); } return fileSys; @@ -109,10 +104,26 @@ public class SecondaryFileSystemProvider { /** * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs. - * @throws IOException + * @throws IOException in case of error. */ - public AbstractFileSystem createAbstractFileSystem() throws IOException { - return AbstractFileSystem.get(uri, cfg); + public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException { + userName = IgfsUtils.fixUserName(userName); + + String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); + + UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName); + + try { + return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() { + @Override public AbstractFileSystem run() throws IOException { + return AbstractFileSystem.get(uri, cfg); + } + }); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", ie); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java deleted file mode 100644 index 509f443..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.FileSystemConfiguration.*; - -/** - * Wrapper of HDFS for support of separated working directory. - */ -public class HadoopDistributedFileSystem extends DistributedFileSystem { - /** User name for each thread. */ - private final ThreadLocal<String> userName = new ThreadLocal<String>() { - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() { - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; - - /** {@inheritDoc} */ - @Override public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - - setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); - } - - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - public void setUser(String userName) { - this.userName.set(userName); - - setWorkingDirectory(getHomeDirectory()); - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); - - return path.makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path dir) { - Path fixedDir = fixRelativePart(dir); - - String res = fixedDir.toUri().getPath(); - - if (!DFSUtil.isValidName(res)) - throw new IllegalArgumentException("Invalid DFS directory name " + res); - - workingDir.set(fixedDir); - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workingDir.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java index f3f51d4..d90bc28 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.ignite.hadoop.fs.v1.*; /** * Utilities for configuring file systems to support the separate working directory per each thread. @@ -30,19 +28,6 @@ public class HadoopFileSystemsUtils { public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; /** - * Set user name and default working directory for current thread if it's supported by file system. - * - * @param fs File system. - * @param userName User name. - */ - public static void setUser(FileSystem fs, String userName) { - if (fs instanceof IgniteHadoopFileSystem) - ((IgniteHadoopFileSystem)fs).setUser(userName); - else if (fs instanceof HadoopDistributedFileSystem) - ((HadoopDistributedFileSystem)fs).setUser(userName); - } - - /** * Setup wrappers of filesystems to support the separate working directory. * * @param cfg Config for setup. @@ -51,7 +36,5 @@ public class HadoopFileSystemsUtils { cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV2.class.getName()); - - cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java new file mode 100644 index 0000000..71b38c4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; +import org.jsr166.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + * + * Despite of the name, does not depend on any Hadoop classes. + */ +public class HadoopLazyConcurrentMap<K, V extends Closeable> { + /** The map storing the actual values. */ + private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory<K, V> factory; + + /** Lock used to close the objects. */ + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + + /** Flag indicating that this map is closed and cleared. */ + private boolean closed; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) { + this.factory = factory; + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + ValueWrapper w = map.get(k); + + if (w == null) { + closeLock.readLock().lock(); + + try { + if (closed) + throw new IllegalStateException("Failed to create value for key [" + k + + "]: the map is already closed."); + + final ValueWrapper wNew = new ValueWrapper(k); + + w = map.putIfAbsent(k, wNew); + + if (w == null) { + wNew.init(); + + w = wNew; + } + } + finally { + closeLock.readLock().unlock(); + } + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (IgniteCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Clears the map and closes all the values. + */ + public void close() throws IgniteCheckedException { + closeLock.writeLock().lock(); + + try { + closed = true; + + Exception err = null; + + Set<K> keySet = map.keySet(); + + for (K key : keySet) { + V v = null; + + try { + v = map.get(key).getValue(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + if (v != null) { + try { + v.close(); + } + catch (Exception err0) { + if (err == null) + err = err0; + } + } + } + + map.clear(); + + if (err != null) + throw new IgniteCheckedException(err); + } + finally { + closeLock.writeLock().unlock(); + } + } + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Future. */ + private final GridFutureAdapter<V> fut = new GridFutureAdapter<>(); + + /** the key */ + private final K key; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + try { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + fut.onDone(v0); + } + catch (Throwable e) { + fut.onDone(e); + } + } + + /** + * Gets the available value or blocks until the value is initialized. + * @return the value, never null. + * @throws IgniteCheckedException on error. + */ + V getValue() throws IgniteCheckedException { + return fut.get(); + } + } + + /** + * Interface representing the factory that creates map values. + * @param <K> the type of the key. + * @param <V> the type of the value. + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. Should never return null. + * + * @param key the key to create value for + * @return the value. + * @throws IgniteException on failure. + */ + public V createValue(K key); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java index 2f19226..b9c5113 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java @@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs { * @throws IOException If failed. */ public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * The user this Igfs instance works on behalf of. + * @return the user name. + */ + public String user(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index 44e531e..47ba0e8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.io.*; @@ -46,25 +47,35 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** Logger. */ private final Log log; + /** The user this Igfs works on behalf of. */ + private final String user; + /** * Constructor. * * @param igfs Target IGFS. * @param log Log. */ - public HadoopIgfsInProc(IgfsEx igfs, Log log) { + public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException { + this.user = IgfsUtils.fixUserName(userName); + this.igfs = igfs; + this.log = log; bufSize = igfs.configuration().getBlockSize() * 2; } /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) { - igfs.clientLogDirectory(logDir); + @Override public IgfsHandshakeResponse handshake(final String logDir) { + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() { + @Override public IgfsHandshakeResponse apply() { + igfs.clientLogDirectory(logDir); - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); + return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), + igfs.globalSampling()); + } + }); } /** {@inheritDoc} */ @@ -82,9 +93,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.info(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() { + @Override public IgfsFile apply() { + return igfs.info(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -95,9 +110,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - return igfs.update(path, props); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() { + @Override public IgfsFile apply() { + return igfs.update(path, props); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -108,9 +127,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException { try { - igfs.setTimes(path, accessTime, modificationTime); + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { + igfs.setTimes(path, accessTime, modificationTime); + + return null; + } + }); return true; } @@ -124,9 +149,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException { try { - igfs.rename(src, dest); + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { + igfs.rename(src, dest); + + return null; + } + }); return true; } @@ -139,9 +170,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException { try { - return igfs.delete(path, recursive); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() { + @Override public Boolean apply() { + return igfs.delete(path, recursive); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -154,18 +189,32 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public IgfsStatus fsStatus() throws IgniteCheckedException { try { - return igfs.globalSpace(); + return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() { + @Override public IgfsStatus call() throws IgniteCheckedException { + return igfs.globalSpace(); + } + }); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " + "stopping."); } + catch (IgniteCheckedException | RuntimeException | Error e) { + throw e; + } + catch (Exception e) { + throw new AssertionError("Must never go there."); + } } /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { + @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.listPaths(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() { + @Override public Collection<IgfsPath> apply() { + return igfs.listPaths(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -176,9 +225,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { + @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.listFiles(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() { + @Override public Collection<IgfsFile> apply() { + return igfs.listFiles(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -189,9 +242,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - igfs.mkdirs(path, props); + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { + igfs.mkdirs(path, props); + + return null; + } + }); return true; } @@ -205,9 +264,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.summary(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() { + @Override public IgfsPathSummary apply() { + return igfs.summary(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -219,10 +282,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len) throws IgniteCheckedException { try { - return igfs.affinity(path, start, len); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() { + @Override public Collection<IgfsBlockLocation> apply() { + return igfs.affinity(path, start, len); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -233,11 +300,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException { try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize); - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -248,12 +319,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) throws IgniteCheckedException { try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -264,13 +339,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate, + final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, - colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, + colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); - return new HadoopIgfsStreamDelegate(this, stream); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -281,12 +360,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, + final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - IgfsOutputStream stream = igfs.append(path, bufSize, create, props); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsOutputStream stream = igfs.append(path, bufSize, create, props); - return new HadoopIgfsStreamDelegate(this, stream); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -407,4 +490,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { if (lsnr0 != null && log.isDebugEnabled()) log.debug("Removed stream event listener [delegate=" + delegate + ']'); } + + /** {@inheritDoc} */ + @Override public String user() { + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java index 0264e7b..3561e95 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java @@ -41,7 +41,7 @@ import java.util.concurrent.locks.*; @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") public class HadoopIgfsIpcIo implements HadoopIgfsIo { /** Logger. */ - private Log log; + private final Log log; /** Request futures map. */ private ConcurrentMap<Long, HadoopIgfsFuture> reqMap = http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java index 7dca049..f23c62c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener /** IGFS name. */ private final String igfs; + /** The user this out proc is performing on behalf of. */ + private final String userName; + /** Client log. */ private final Log log; @@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException { - this(host, port, grid, igfs, false, log); + public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException { + this(host, port, grid, igfs, false, log, user); } /** @@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException { - this(null, port, grid, igfs, true, log); + public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException { + this(null, port, grid, igfs, true, log, user); } /** @@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log) + private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user) throws IOException { assert host != null && !shmem || host == null && shmem : "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; @@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener this.grid = grid; this.igfs = igfs; this.log = log; + this.userName = IgfsUtils.fixUserName(user); io = HadoopIgfsIpcIo.get(log, endpoint); @@ -173,6 +177,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(INFO); msg.path(path); + msg.userName(userName); return io.send(msg).chain(FILE_RES).get(); } @@ -184,6 +189,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(UPDATE); msg.path(path); msg.properties(props); + msg.userName(userName); return io.send(msg).chain(FILE_RES).get(); } @@ -196,6 +202,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.accessTime(accessTime); msg.modificationTime(modificationTime); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -207,6 +214,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(RENAME); msg.path(src); msg.destinationPath(dest); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -218,6 +226,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(DELETE); msg.path(path); msg.flag(recursive); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -231,6 +240,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.start(start); msg.length(len); + msg.userName(userName); return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); } @@ -241,6 +251,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(PATH_SUMMARY); msg.path(path); + msg.userName(userName); return io.send(msg).chain(SUMMARY_RES).get(); } @@ -252,6 +263,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(MAKE_DIRECTORIES); msg.path(path); msg.properties(props); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -262,6 +274,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(LIST_FILES); msg.path(path); + msg.userName(userName); return io.send(msg).chain(FILE_COL_RES).get(); } @@ -272,6 +285,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(LIST_PATHS); msg.path(path); + msg.userName(userName); return io.send(msg).chain(PATH_COL_RES).get(); } @@ -288,6 +302,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(OPEN_READ); msg.path(path); msg.flag(false); + msg.userName(userName); IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); @@ -303,6 +318,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.flag(true); msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); + msg.userName(userName); IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); @@ -321,6 +337,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.properties(props); msg.replication(replication); msg.blockSize(blockSize); + msg.userName(userName); Long streamId = io.send(msg).chain(LONG_RES).get(); @@ -336,6 +353,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.flag(create); msg.properties(props); + msg.userName(userName); Long streamId = io.send(msg).chain(LONG_RES).get(); @@ -471,4 +489,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener } }; } + + /** {@inheritDoc} */ + @Override public String user() { + return userName; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java index 1dada21..7d0db49 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -55,6 +55,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs { /** Logger. */ private final Log log; + /** The user name this wrapper works on behalf of. */ + private final String userName; + /** * Constructor. * @@ -63,13 +66,15 @@ public class HadoopIgfsWrapper implements HadoopIgfs { * @param conf Configuration. * @param log Current logger. */ - public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException { + public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) + throws IOException { try { this.authority = authority; this.endpoint = new HadoopIgfsEndpoint(authority); this.logDir = logDir; this.conf = conf; this.log = log; + this.userName = user; } catch (IgniteCheckedException e) { throw new IOException("Failed to parse endpoint: " + authority, e); @@ -362,13 +367,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsInProc(igfs, log); + hadoop = new HadoopIgfsInProc(igfs, log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } catch (IOException | IgniteCheckedException e) { if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); + if (hadoop != null) + hadoop.close(true); if (log.isDebugEnabled()) log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e); @@ -384,7 +390,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -409,7 +415,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { try { hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), - log); + log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -430,7 +436,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index 2e04ac1..b170125 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -99,6 +99,22 @@ public abstract class HadoopRunnableTask implements Callable<Void> { /** {@inheritDoc} */ @Override public Void call() throws IgniteCheckedException { + ctx = job.getTaskContext(info); + + return ctx.runAsJobOwner(new Callable<Void>() { + @Override public Void call() throws Exception { + call0(); + + return null; + } + }); + } + + /** + * Implements actual task running. + * @throws IgniteCheckedException + */ + void call0() throws IgniteCheckedException { execStartTs = U.currentTimeMillis(); Throwable err = null; @@ -108,8 +124,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> { HadoopPerformanceCounter perfCntr = null; try { - ctx = job.getTaskContext(info); - perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); perfCntr.onTaskSubmit(info, submitTs); @@ -156,8 +170,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> { if (ctx != null) ctx.cleanupTaskEnvironment(); } - - return null; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index d265ca8..d754039 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.hadoop.v2; import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.JobID; @@ -68,7 +67,7 @@ public class HadoopV2Job implements HadoopJob { new ConcurrentHashMap8<>(); /** Pooling task context class and thus class loading environment. */ - private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); + private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); /** All created contexts. */ private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); @@ -93,12 +92,7 @@ public class HadoopV2Job implements HadoopJob { hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); - HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader(); - - // Before create JobConf instance we should set new context class loader. - Thread.currentThread().setContextClassLoader(clsLdr); - - jobConf = new JobConf(); + jobConf = HadoopUtils.safeCreateJobConf(); HadoopFileSystemsUtils.setupFileSystems(jobConf); @@ -139,7 +133,9 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) { + try { + FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true); + JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); @@ -197,7 +193,7 @@ public class HadoopV2Job implements HadoopJob { if (old != null) return old.get(); - Class<?> cls = taskCtxClsPool.poll(); + Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll(); try { if (cls == null) { @@ -205,9 +201,9 @@ public class HadoopV2Job implements HadoopJob { // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); + "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); - cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); + cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); fullCtxClsQueue.add(cls); } @@ -325,7 +321,14 @@ public class HadoopV2Job implements HadoopJob { /** {@inheritDoc} */ @Override public void cleanupStagingDirectory() { - if (rsrcMgr != null) - rsrcMgr.cleanupStagingDirectory(); + rsrcMgr.cleanupStagingDirectory(); + } + + /** + * Getter for job configuration. + * @return The job configuration. + */ + public JobConf jobConf() { + return jobConf; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 6f6bfa1..2f64e77 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -40,6 +40,9 @@ import java.util.*; * files are needed to be placed on local files system. */ public class HadoopV2JobResourceManager { + /** File type Fs disable caching property name. */ + private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file"); + /** Hadoop job context. */ private final JobContextImpl ctx; @@ -84,7 +87,7 @@ public class HadoopV2JobResourceManager { try { cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath()); - if(!cfg.getBoolean("fs.file.impl.disable.cache", false)) + if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false)) FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath())); } finally { @@ -112,15 +115,17 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg); + FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true); if (!fs.exists(stagingDir)) - throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + - stagingDir); + throw new IgniteCheckedException("Failed to find map-reduce submission " + + "directory (does not exist): " + stagingDir); if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg)) - throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " + - "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']'); + throw new IgniteCheckedException("Failed to copy job submission directory " + + "contents to local file system " + + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + + ", jobId=" + jobId + ']'); } File jarJobFile = new File(jobLocDir, "job.jar"); @@ -144,7 +149,8 @@ public class HadoopV2JobResourceManager { } } else if (!jobLocDir.mkdirs()) - throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath()); + throw new IgniteCheckedException("Failed to create local job directory: " + + jobLocDir.getAbsolutePath()); setLocalFSWorkingDirectory(jobLocDir); } @@ -204,14 +210,14 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); - FileSystem srcFs = srcPath.getFileSystem(cfg); + FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true); if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); if (!archivesPath.exists() && !archivesPath.mkdir()) throw new IOException("Failed to create directory " + - "[path=" + archivesPath + ", jobId=" + jobId + ']'); + "[path=" + archivesPath + ", jobId=" + jobId + ']'); File archiveFile = new File(archivesPath, locName); @@ -287,7 +293,7 @@ public class HadoopV2JobResourceManager { public void cleanupStagingDirectory() { try { if (stagingDir != null) - stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true); + HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true); } catch (Exception e) { log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index e9c859bd..e89feba 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -28,17 +28,21 @@ import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v1.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; +import java.security.*; import java.util.*; +import java.util.concurrent.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; @@ -239,9 +243,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); try { - FileSystem fs = FileSystem.get(jobConf()); - - HadoopFileSystemsUtils.setUser(fs, jobConf().getUser()); + FileSystem.get(jobConf()); LocalFileSystem locFs = FileSystem.getLocal(jobConf()); @@ -421,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf()); + try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false); FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { in.seek(split.offset()); @@ -450,4 +452,44 @@ public class HadoopV2TaskContext extends HadoopTaskContext { throw new IgniteCheckedException(e); } } + + /** {@inheritDoc} */ + @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException { + String user = job.info().user(); + + user = IgfsUtils.fixUserName(user); + + assert user != null; + + String ugiUser; + + try { + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + + assert currUser != null; + + ugiUser = currUser.getShortUserName(); + } + catch (IOException ioe) { + throw new IgniteCheckedException(ioe); + } + + try { + if (F.eq(user, ugiUser)) + // if current UGI context user is the same, do direct call: + return c.call(); + else { + UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user); + + return ugi.doAs(new PrivilegedExceptionAction<T>() { + @Override public T run() throws Exception { + return c.call(); + } + }); + } + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java index b94d9d1..b9f8179 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java @@ -28,7 +28,6 @@ import org.apache.ignite.*; import org.apache.ignite.hadoop.mapreduce.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.proto.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { * @return Configuration. */ private Configuration config(int port) { - Configuration conf = new Configuration(); + Configuration conf = HadoopUtils.safeCreateConfiguration(); setupFileSystems(conf); @@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { ctx.getCounter(TestCounter.COUNTER2).increment(1); int sum = 0; - for (IntWritable value : values) { + for (IntWritable value : values) sum += value.get(); - } ctx.write(key, new IntWritable(sum)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java index d11cabb..91f926d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; @@ -39,6 +40,7 @@ import org.jsr166.*; import java.io.*; import java.net.*; +import java.security.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -58,6 +60,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA /** Thread count for multithreaded tests. */ private static final int THREAD_CNT = 8; + /** Secondary file system user. */ + private static final String SECONDARY_FS_USER = "secondary-default"; + /** IP finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -255,7 +260,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA if (mode != PRIMARY) cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(secondaryFileSystemUriPath(), - secondaryFileSystemConfigPath())); + secondaryFileSystemConfigPath(), SECONDARY_FS_USER)); cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); cfg.setManagementPort(-1); @@ -278,11 +283,28 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA primaryFsCfg.addResource(U.resolveIgniteUrl(primaryFileSystemConfigPath())); - fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg); + UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, getClientFsUser()); + + // Create Fs on behalf of the client user: + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override public Object run() throws Exception { + fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg); + + return null; + } + }); barrier = new CyclicBarrier(THREAD_CNT); } + /** + * Gets the user the Fs client operates on bahalf of. + * @return The user the Fs client operates on bahalf of. + */ + protected String getClientFsUser() { + return "foo"; + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { try { @@ -297,14 +319,17 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA /** @throws Exception If failed. */ public void testStatus() throws Exception { + Path file1 = new Path("/file1"); - try (FSDataOutputStream file = fs.create(new Path("/file1"), EnumSet.noneOf(CreateFlag.class), + try (FSDataOutputStream file = fs.create(file1, EnumSet.noneOf(CreateFlag.class), Options.CreateOpts.perms(FsPermission.getDefault()))) { file.write(new byte[1024 * 1024]); } FsStatus status = fs.getFsStatus(); + assertEquals(getClientFsUser(), fs.getFileStatus(file1).getOwner()); + assertEquals(4, grid(0).cluster().nodes().size()); long used = 0, max = 0; @@ -707,6 +732,8 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA os.close(); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); + fs.setOwner(file, "aUser", "aGroup"); assertEquals("aUser", fs.getFileStatus(file).getOwner()); @@ -796,20 +823,20 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA int cnt = 2 * 1024; - FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class), - Options.CreateOpts.perms(FsPermission.getDefault())); + try (FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class), + Options.CreateOpts.perms(FsPermission.getDefault()))) { - for (long i = 0; i < cnt; i++) - out.writeLong(i); + for (long i = 0; i < cnt; i++) + out.writeLong(i); + } - out.close(); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); - FSDataInputStream in = fs.open(file, 1024); + try (FSDataInputStream in = fs.open(file, 1024)) { - for (long i = 0; i < cnt; i++) - assertEquals(i, in.readLong()); - - in.close(); + for (long i = 0; i < cnt; i++) + assertEquals(i, in.readLong()); + } } /** @throws Exception If failed. */ @@ -1052,7 +1079,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA } /** @throws Exception If failed. */ - public void _testRenameDirectoryIfDstPathExists() throws Exception { + public void testRenameDirectoryIfDstPathExists() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-825"); + Path fsHome = new Path(primaryFsUri); Path srcDir = new Path(fsHome, "/tmp/"); Path dstDir = new Path(fsHome, "/tmpNew/"); @@ -1191,6 +1220,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA assertEquals(dirPerm, fs.getFileStatus(dir).getPermission()); assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission()); + + assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner()); + assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner()); } /** @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index 9e84c51..b089995 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra primaryConfFullPath = null; SecondaryFileSystemProvider provider = - new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath, null); + new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath); - primaryFs = provider.createFileSystem(); + primaryFs = provider.createFileSystem(null); primaryFsUri = provider.uri(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java index d3440fc..c0f73af 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java @@ -73,10 +73,7 @@ public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true)); FileSystemConfiguration igfsCfg = new FileSystemConfiguration();