[IGNITE-218]: corrections after review.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3fe65826
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3fe65826
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3fe65826

Branch: refs/heads/ignite-218-hdfs-only
Commit: 3fe6582638bdaecd9f8b92c5c437b456d8bec411
Parents: c03cc58
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Thu May 28 19:29:16 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Thu May 28 19:29:16 2015 +0300

----------------------------------------------------------------------
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 28 +-----
 .../hadoop/fs/HadoopLazyConcurrentMap.java      | 97 ++++++++++++++------
 .../hadoop/v2/HadoopV2TaskContext.java          |  5 +-
 3 files changed, 69 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 9ed92ad..51878da 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -466,33 +466,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem 
implements IgfsSecondaryFileSys
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteCheckedException {
-        final HadoopLazyConcurrentMap<String,FileSystem> map = fileSysLazyMap;
-
-        if (map == null)
-            return; // already cleared.
-
-        List<IOException> ioExs = new LinkedList<>();
-
-        // TODO: Close is not thread-safe.
-        Set<String> keySet = map.keySet();
-
-        for (String key: keySet) {
-            FileSystem fs = map.get(key);
-
-            if (fs != null) {
-                try {
-                    fs.close();
-                }
-                catch (IOException ioe) {
-                    ioExs.add(ioe);
-                }
-            }
-        }
-
-        map.clear();
-
-        if (!ioExs.isEmpty())
-            throw new IgniteCheckedException(ioExs.get(0));
+        fileSysLazyMap.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index cc36ea0..0fe9871 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -18,31 +18,34 @@
 package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.future.*;
-// TODO: Remove unused
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
 
 /**
  * Maps values by keys.
  * Values are created lazily using {@link ValueFactory}.
- * Currently only {@link #clear()} method can remove a value.
  *
  * Despite of the name, does not depend on any Hadoop classes.
  */
-public class HadoopLazyConcurrentMap<K, V> {
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
     /** The map storing the actual values. */
     private final ConcurrentMap<K, ValueWrapper> map = new 
ConcurrentHashMap8<>();
 
     /** The factory passed in by the client. Will be used for lazy value 
creation. */
     private final ValueFactory<K, V> factory;
 
+    /** Lock used to close the objects. */
+    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+    /** Flag indicating that this map is closed and cleared. */
+    private boolean closed;
+
     /**
      * Constructor.
      * @param factory the factory to create new values lazily.
@@ -59,16 +62,30 @@ public class HadoopLazyConcurrentMap<K, V> {
      * @throws IgniteException on error
      */
     public V getOrCreate(K k) {
-        // TODO: Do "get" first.
-        final ValueWrapper wNew = new ValueWrapper(k);
-
-        ValueWrapper w = map.putIfAbsent(k, wNew);
+        ValueWrapper w = map.get(k);
 
         if (w == null) {
-            // new wrapper 'w' has been put, so init the value:
-            wNew.init();
+            final ValueWrapper wNew = new ValueWrapper(k);
+
+            w = map.putIfAbsent(k, wNew);
+
+            if (w == null) {
+                // new wrapper 'w' has been put, so init the value:
+                closeLock.readLock().lock();
 
-            w = wNew;
+                try {
+                    if (closed)
+                        throw new IllegalStateException("Failed to create 
value for key [" + k
+                            + "]: the map is already closed.");
+
+                    wNew.init();
+                }
+                finally {
+                    closeLock.readLock().unlock();
+                }
+
+                w = wNew;
+            }
         }
 
         try {
@@ -103,20 +120,40 @@ public class HadoopLazyConcurrentMap<K, V> {
     }
 
     /**
-     * Gets the keySet of this map,
-     * the contract is as per {@link ConcurrentMap#keySet()}
-     * @return the set of keys, never null.
+     * Clears the map and closes all the values.
      */
-    public Set<K> keySet() {
-        return map.keySet();
-    }
+    public void close() throws IgniteCheckedException {
+        closeLock.writeLock().lock();
 
-    /**
-     * Clears the map.
-     * Follows the contract of {@link ConcurrentMap#clear()}
-     */
-    public void clear() {
-        map.clear();
+        try {
+            List<IOException> ioExs = new LinkedList<>();
+
+            Set<K> keySet = map.keySet();
+
+            for (K key: keySet) {
+                ValueWrapper w = map.get(key);
+
+                if (w != null) {
+                    try {
+                        V v = w.getValue();
+
+                        v.close();
+                    }
+                    catch (IOException ioe) {
+                        ioExs.add(ioe);
+                    }
+                }
+            }
+
+            map.clear();
+
+            if (!ioExs.isEmpty())
+                throw new IgniteCheckedException(ioExs.get(0));
+
+            closed = true;
+        } finally {
+            closeLock.writeLock().unlock();
+        }
     }
 
     /**
@@ -154,11 +191,11 @@ public class HadoopLazyConcurrentMap<K, V> {
         }
 
         /**
-         * Blocks until the value is initialized.
-         * @return the value
-         * @throws IgniteInterruptedCheckedException if interrupted during 
wait.
+         * Gets the available value or blocks until the value is initialized.
+         * @return the value, never null.
+         * @throws IgniteCheckedException on error.
          */
-        @Nullable V getValue() throws IgniteCheckedException {
+        V getValue() throws IgniteCheckedException {
             return fut.get();
         }
     }
@@ -170,7 +207,7 @@ public class HadoopLazyConcurrentMap<K, V> {
      */
     public interface ValueFactory <K, V> {
         /**
-         * Creates the new value. Must never return null.
+         * Creates the new value. Should never return null.
          *
          * @param key the key to create value for
          * @return the value.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 2270caa..dd18c66 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -239,10 +239,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext 
{
         
Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
 
         try {
-            FileSystem fs = FileSystem.get(jobConf());
-
-            // TODO: Remove
-            //HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+            FileSystem.get(jobConf());
 
             LocalFileSystem locFs = FileSystem.getLocal(jobConf());
 

Reply via email to