[IGNITE-218]: corrections after review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3fe65826 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3fe65826 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3fe65826 Branch: refs/heads/ignite-218-hdfs-only Commit: 3fe6582638bdaecd9f8b92c5c437b456d8bec411 Parents: c03cc58 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Thu May 28 19:29:16 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Thu May 28 19:29:16 2015 +0300 ---------------------------------------------------------------------- .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 28 +----- .../hadoop/fs/HadoopLazyConcurrentMap.java | 97 ++++++++++++++------ .../hadoop/v2/HadoopV2TaskContext.java | 5 +- 3 files changed, 69 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index 9ed92ad..51878da 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -466,33 +466,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - final HadoopLazyConcurrentMap<String,FileSystem> map = fileSysLazyMap; - - if (map == null) - return; // already cleared. - - List<IOException> ioExs = new LinkedList<>(); - - // TODO: Close is not thread-safe. - Set<String> keySet = map.keySet(); - - for (String key: keySet) { - FileSystem fs = map.get(key); - - if (fs != null) { - try { - fs.close(); - } - catch (IOException ioe) { - ioExs.add(ioe); - } - } - } - - map.clear(); - - if (!ioExs.isEmpty()) - throw new IgniteCheckedException(ioExs.get(0)); + fileSysLazyMap.close(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java index cc36ea0..0fe9871 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -18,31 +18,34 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.ignite.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.future.*; -// TODO: Remove unused -import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import org.jsr166.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; /** * Maps values by keys. * Values are created lazily using {@link ValueFactory}. - * Currently only {@link #clear()} method can remove a value. * * Despite of the name, does not depend on any Hadoop classes. */ -public class HadoopLazyConcurrentMap<K, V> { +public class HadoopLazyConcurrentMap<K, V extends Closeable> { /** The map storing the actual values. */ private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); /** The factory passed in by the client. Will be used for lazy value creation. */ private final ValueFactory<K, V> factory; + /** Lock used to close the objects. */ + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + + /** Flag indicating that this map is closed and cleared. */ + private boolean closed; + /** * Constructor. * @param factory the factory to create new values lazily. @@ -59,16 +62,30 @@ public class HadoopLazyConcurrentMap<K, V> { * @throws IgniteException on error */ public V getOrCreate(K k) { - // TODO: Do "get" first. - final ValueWrapper wNew = new ValueWrapper(k); - - ValueWrapper w = map.putIfAbsent(k, wNew); + ValueWrapper w = map.get(k); if (w == null) { - // new wrapper 'w' has been put, so init the value: - wNew.init(); + final ValueWrapper wNew = new ValueWrapper(k); + + w = map.putIfAbsent(k, wNew); + + if (w == null) { + // new wrapper 'w' has been put, so init the value: + closeLock.readLock().lock(); - w = wNew; + try { + if (closed) + throw new IllegalStateException("Failed to create value for key [" + k + + "]: the map is already closed."); + + wNew.init(); + } + finally { + closeLock.readLock().unlock(); + } + + w = wNew; + } } try { @@ -103,20 +120,40 @@ public class HadoopLazyConcurrentMap<K, V> { } /** - * Gets the keySet of this map, - * the contract is as per {@link ConcurrentMap#keySet()} - * @return the set of keys, never null. + * Clears the map and closes all the values. */ - public Set<K> keySet() { - return map.keySet(); - } + public void close() throws IgniteCheckedException { + closeLock.writeLock().lock(); - /** - * Clears the map. - * Follows the contract of {@link ConcurrentMap#clear()} - */ - public void clear() { - map.clear(); + try { + List<IOException> ioExs = new LinkedList<>(); + + Set<K> keySet = map.keySet(); + + for (K key: keySet) { + ValueWrapper w = map.get(key); + + if (w != null) { + try { + V v = w.getValue(); + + v.close(); + } + catch (IOException ioe) { + ioExs.add(ioe); + } + } + } + + map.clear(); + + if (!ioExs.isEmpty()) + throw new IgniteCheckedException(ioExs.get(0)); + + closed = true; + } finally { + closeLock.writeLock().unlock(); + } } /** @@ -154,11 +191,11 @@ public class HadoopLazyConcurrentMap<K, V> { } /** - * Blocks until the value is initialized. - * @return the value - * @throws IgniteInterruptedCheckedException if interrupted during wait. + * Gets the available value or blocks until the value is initialized. + * @return the value, never null. + * @throws IgniteCheckedException on error. */ - @Nullable V getValue() throws IgniteCheckedException { + V getValue() throws IgniteCheckedException { return fut.get(); } } @@ -170,7 +207,7 @@ public class HadoopLazyConcurrentMap<K, V> { */ public interface ValueFactory <K, V> { /** - * Creates the new value. Must never return null. + * Creates the new value. Should never return null. * * @param key the key to create value for * @return the value. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 2270caa..dd18c66 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -239,10 +239,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); try { - FileSystem fs = FileSystem.get(jobConf()); - - // TODO: Remove - //HadoopFileSystemsUtils.setUser(fs, jobConf().getUser()); + FileSystem.get(jobConf()); LocalFileSystem locFs = FileSystem.getLocal(jobConf());