[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8455c7a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8455c7a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8455c7a6 Branch: refs/heads/ignite-916 Commit: 8455c7a6ed6f7449c7ad31b1ef7b129705262e1b Parents: 3538819 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Fri May 29 15:40:26 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Fri May 29 15:40:26 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/igfs/IgfsUserContext.java | 119 +++++++++++ .../hadoop/fs/HadoopLazyConcurrentMap.java | 204 +++++++++++++++++++ 2 files changed, 323 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java new file mode 100644 index 0000000..5a65bdb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.concurrent.*; + +/** + * Provides ability to execute IGFS code in a context of a specific user. + */ +public abstract class IgfsUserContext { + /** Thread local to hold the current user context. */ + private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>(); + + /** + * Executes given callable in the given user context. + * The main contract of this method is that {@link #currentUser()} method invoked + * inside closure always returns 'user' this callable executed with. + * @param user the user name to invoke closure on behalf of. + * @param clo the closure to execute + * @param <T> The type of closure result. + * @return the result of closure execution. + * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. + */ + public static <T> T doAs(String user, final IgniteOutClosure<T> clo) { + if (F.isEmpty(user)) + throw new IllegalArgumentException("Failed to use null or empty user name."); + + final String ctxUser = userStackThreadLocal.get(); + + if (F.eq(ctxUser, user)) + return clo.apply(); // correct context is already there + + userStackThreadLocal.set(user); + + try { + return clo.apply(); + } + finally { + userStackThreadLocal.set(ctxUser); + } + } + + /** + * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts + * callable that throws checked Exception. + * The Exception is not ever wrapped anyhow. + * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is: + * <pre name="code" class="java"> + * public Foo myOperation() throws MyCheckedException1, MyCheckedException2 { + * try { + * return IgfsUserContext.doAs(user, new Callable<Foo>() { + * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 { + * return makeSomeFoo(); // do the job + * } + * }); + * } + * catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) { + * throw e; + * } + * catch (Exception e) { + * throw new AssertionError("Must never go there."); + * } + * } + * </pre> + * @param user the user name to invoke closure on behalf of. + * @param clbl the Callable to execute + * @param <T> The type of callable result. + * @return the result of closure execution. + * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. + */ + public static <T> T doAs(String user, final Callable<T> clbl) throws Exception { + if (F.isEmpty(user)) + throw new IllegalArgumentException("Failed to use null or empty user name."); + + final String ctxUser = userStackThreadLocal.get(); + + if (F.eq(ctxUser, user)) + return clbl.call(); // correct context is already there + + userStackThreadLocal.set(user); + + try { + return clbl.call(); + } + finally { + userStackThreadLocal.set(ctxUser); + } + } + + /** + * Gets the current context user. + * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will + * return null. Otherwise it will return the user name set in the most lower + * {@link #doAs(String, IgniteOutClosure)} call on the call stack. + * @return The current user, may be null. + */ + @Nullable public static String currentUser() { + return userStackThreadLocal.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java new file mode 100644 index 0000000..71b38c4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; +import org.jsr166.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + * + * Despite of the name, does not depend on any Hadoop classes. + */ +public class HadoopLazyConcurrentMap<K, V extends Closeable> { + /** The map storing the actual values. */ + private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory<K, V> factory; + + /** Lock used to close the objects. */ + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + + /** Flag indicating that this map is closed and cleared. */ + private boolean closed; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) { + this.factory = factory; + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + ValueWrapper w = map.get(k); + + if (w == null) { + closeLock.readLock().lock(); + + try { + if (closed) + throw new IllegalStateException("Failed to create value for key [" + k + + "]: the map is already closed."); + + final ValueWrapper wNew = new ValueWrapper(k); + + w = map.putIfAbsent(k, wNew); + + if (w == null) { + wNew.init(); + + w = wNew; + } + } + finally { + closeLock.readLock().unlock(); + } + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (IgniteCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Clears the map and closes all the values. + */ + public void close() throws IgniteCheckedException { + closeLock.writeLock().lock(); + + try { + closed = true; + + Exception err = null; + + Set<K> keySet = map.keySet(); + + for (K key : keySet) { + V v = null; + + try { + v = map.get(key).getValue(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + if (v != null) { + try { + v.close(); + } + catch (Exception err0) { + if (err == null) + err = err0; + } + } + } + + map.clear(); + + if (err != null) + throw new IgniteCheckedException(err); + } + finally { + closeLock.writeLock().unlock(); + } + } + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Future. */ + private final GridFutureAdapter<V> fut = new GridFutureAdapter<>(); + + /** the key */ + private final K key; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + try { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + fut.onDone(v0); + } + catch (Throwable e) { + fut.onDone(e); + } + } + + /** + * Gets the available value or blocks until the value is initialized. + * @return the value, never null. + * @throws IgniteCheckedException on error. + */ + V getValue() throws IgniteCheckedException { + return fut.get(); + } + } + + /** + * Interface representing the factory that creates map values. + * @param <K> the type of the key. + * @param <V> the type of the value. + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. Should never return null. + * + * @param key the key to create value for + * @return the value. + * @throws IgniteException on failure. + */ + public V createValue(K key); + } +} \ No newline at end of file