Author: davsclaus
Date: Thu Mar  7 14:15:27 2013
New Revision: 1453882

URL: http://svn.apache.org/r1453882
Log:
CAMEL-6142: camel-hdfs - Preserve auth as Hadoop overrides that and cause 
sideeffects for auth afterwards

Added:
    
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
      - copied unchanged from r1453879, 
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml
    
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
    
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
    
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
    
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
    
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
    
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
    
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
    
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1453879

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml Thu Mar  7 
14:15:27 2013
@@ -133,6 +133,11 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 

Modified: 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
 Thu Mar  7 14:15:27 2013
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.security.auth.login.Configuration;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
@@ -76,7 +78,7 @@ public final class HdfsConsumer extends 
         }
 
         // hadoop will cache the connection by default so its faster to get in 
the poll method
-        HdfsInfo answer = new HdfsInfo(this.hdfsPath.toString());
+        HdfsInfo answer = 
HdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString());
 
         if (onStartup) {
             log.info("Connected to hdfs file-system {}:{}/{}", new 
Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
@@ -90,6 +92,20 @@ public final class HdfsConsumer extends 
 
     @Override
     protected int poll() throws Exception {
+        // need to remember auth as Hadoop will override that, which otherwise 
means the Auth is broken afterwards
+        Configuration auth = Configuration.getConfiguration();
+        log.trace("Existing JAAS Configuration {}", auth);
+        try {
+            return doPoll();
+        } finally {
+            if (auth != null) {
+                log.trace("Restoring existing JAAS Configuration {}", auth);
+                Configuration.setConfiguration(auth);
+            }
+        }
+    }
+
+    protected int doPoll() throws Exception {
         class ExcludePathFilter implements PathFilter {
             public boolean accept(Path path) {
                 return !(path.toString().endsWith(config.getOpenedSuffix()) || 
path.toString().endsWith(config.getReadSuffix()));

Modified: 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
 Thu Mar  7 14:15:27 2013
@@ -96,7 +96,7 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 if (!configuration.isAppend()) {
                     rout = hdfsInfo.getFileSystem().create(hdfsInfo.getPath(), 
configuration.isOverwrite(), configuration.getBufferSize(),
                             configuration.getReplication(), 
configuration.getBlockSize(), new Progressable() {
@@ -122,7 +122,7 @@ public enum HdfsFileType {
             try {
                 Closeable rin;
                 if 
(configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
-                    HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                    HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                     rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
                 } else {
                     rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, 
configuration));
@@ -142,7 +142,7 @@ public enum HdfsFileType {
                     outputDest.delete();
                 }
 
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 FileSystem fileSystem = hdfsInfo.getFileSystem();
                 FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, 
false, fileSystem.getConf());
                 try {
@@ -204,7 +204,7 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 Class<?> keyWritableClass = 
configuration.getKeyType().getWritableClass();
                 Class<?> valueWritableClass = 
configuration.getValueType().getWritableClass();
                 rout = SequenceFile.createWriter(hdfsInfo.getFileSystem(), 
hdfsInfo.getConf(), hdfsInfo.getPath(), keyWritableClass,
@@ -224,7 +224,7 @@ public enum HdfsFileType {
         public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
             try {
                 Closeable rin;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 rin = new SequenceFile.Reader(hdfsInfo.getFileSystem(), 
hdfsInfo.getPath(), hdfsInfo.getConf());
                 return rin;
             } catch (IOException ex) {
@@ -273,7 +273,7 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 Class<? extends WritableComparable> keyWritableClass = 
configuration.getKeyType().getWritableClass();
                 Class<? extends WritableComparable> valueWritableClass = 
configuration.getValueType().getWritableClass();
                 rout = new MapFile.Writer(hdfsInfo.getConf(), 
hdfsInfo.getFileSystem(), hdfsPath, keyWritableClass, valueWritableClass,
@@ -292,7 +292,7 @@ public enum HdfsFileType {
         public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
             try {
                 Closeable rin;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 rin = new MapFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, 
hdfsInfo.getConf());
                 return rin;
             } catch (IOException ex) {
@@ -341,7 +341,7 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 Class<? extends WritableComparable> keyWritableClass = 
configuration.getKeyType().getWritableClass();
                 Class<? extends WritableComparable> valueWritableClass = 
configuration.getValueType().getWritableClass();
                 rout = new BloomMapFile.Writer(hdfsInfo.getConf(), 
hdfsInfo.getFileSystem(), hdfsPath, keyWritableClass, valueWritableClass,
@@ -360,7 +360,7 @@ public enum HdfsFileType {
         public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
             try {
                 Closeable rin;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 rin = new BloomMapFile.Reader(hdfsInfo.getFileSystem(), 
hdfsPath, hdfsInfo.getConf());
                 return rin;
             } catch (IOException ex) {
@@ -404,7 +404,7 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 Class<? extends WritableComparable> valueWritableClass = 
configuration.getValueType().getWritableClass();
                 rout = new ArrayFile.Writer(hdfsInfo.getConf(), 
hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
                         configuration.getCompressionType(), new Progressable() 
{
@@ -422,7 +422,7 @@ public enum HdfsFileType {
         public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
             try {
                 Closeable rin;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
                 rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, 
hdfsInfo.getConf());
                 return rin;
             } catch (IOException ex) {

Modified: 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
 Thu Mar  7 14:15:27 2013
@@ -23,13 +23,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-public class HdfsInfo {
+public final class HdfsInfo {
 
     private Configuration conf;
     private FileSystem fileSystem;
     private Path path;
 
-    public HdfsInfo(String hdfsPath) throws IOException {
+    HdfsInfo(String hdfsPath) throws IOException {
         this.conf = new Configuration();
         // this will connect to the hadoop hdfs file system, and in case of no 
connection
         // then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
@@ -37,15 +37,15 @@ public class HdfsInfo {
         this.path = new Path(hdfsPath);
     }
 
-    public final Configuration getConf() {
+    public Configuration getConf() {
         return conf;
     }
 
-    public final FileSystem getFileSystem() {
+    public FileSystem getFileSystem() {
         return fileSystem;
     }
 
-    public final Path getPath() {
+    public Path getPath() {
         return path;
     }
 }

Modified: 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
 Thu Mar  7 14:15:27 2013
@@ -19,12 +19,16 @@ package org.apache.camel.component.hdfs;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.login.Configuration;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HdfsInputStream implements Closeable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsInputStream.class);
     private HdfsFileType fileType;
     private String actualPath;
     private String suffixedPath;
@@ -43,7 +47,7 @@ public class HdfsInputStream implements 
         ret.actualPath = hdfsPath;
         ret.suffixedPath = ret.actualPath + '.' + 
configuration.getOpenedSuffix();
         ret.chunkSize = configuration.getChunkSize();
-        HdfsInfo info = new HdfsInfo(ret.actualPath);
+        HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
         info.getFileSystem().rename(new Path(ret.actualPath), new 
Path(ret.suffixedPath));
         ret.in = ret.fileType.createInputStream(ret.suffixedPath, 
configuration);
         ret.opened = true;
@@ -54,7 +58,7 @@ public class HdfsInputStream implements 
     public final void close() throws IOException {
         if (opened) {
             IOUtils.closeStream(in);
-            HdfsInfo info = new HdfsInfo(actualPath);
+            HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath);
             info.getFileSystem().rename(new Path(suffixedPath), new 
Path(actualPath + '.' + HdfsConstants.DEFAULT_READ_SUFFIX));
             opened = false;
         }

Modified: 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
 Thu Mar  7 14:15:27 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.io.IOUtils;
 public class HdfsOutputStream implements Closeable {
 
     private HdfsFileType fileType;
+    private HdfsInfo info;
     private String actualPath;
     private String suffixedPath;
     private Closeable out;
@@ -45,21 +46,21 @@ public class HdfsOutputStream implements
         HdfsOutputStream ret = new HdfsOutputStream();
         ret.fileType = configuration.getFileType();
         ret.actualPath = hdfsPath;
-        HdfsInfo info = new HdfsInfo(ret.actualPath);
+        ret.info = new HdfsInfo(ret.actualPath);
 
         ret.suffixedPath = ret.actualPath + '.' + 
configuration.getOpenedSuffix();
         if (configuration.isWantAppend() || configuration.isAppend()) {
-            if (!info.getFileSystem().exists(new Path(ret.actualPath))) {
+            if (!ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
                 configuration.setAppend(false);
             } else {
                 configuration.setAppend(true);
-                info = new HdfsInfo(ret.suffixedPath);
-                info.getFileSystem().rename(new Path(ret.actualPath), new 
Path(ret.suffixedPath));
+                ret.info = new HdfsInfo(ret.suffixedPath);
+                ret.info.getFileSystem().rename(new Path(ret.actualPath), new 
Path(ret.suffixedPath));
             }
         } else {
-            if (info.getFileSystem().exists(new Path(ret.actualPath))) {
+            if (ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
                 if (configuration.isOverwrite()) {
-                    info.getFileSystem().delete(new Path(ret.actualPath), 
true);
+                    ret.info.getFileSystem().delete(new Path(ret.actualPath), 
true);
                 } else {
                     throw new RuntimeCamelException("The file already exists");
                 }
@@ -74,7 +75,6 @@ public class HdfsOutputStream implements
     public void close() throws IOException {
         if (opened) {
             IOUtils.closeStream(out);
-            HdfsInfo info = new HdfsInfo(actualPath);
             info.getFileSystem().rename(new Path(suffixedPath), new 
Path(actualPath));
             opened = false;
         }

Modified: 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
 Thu Mar  7 14:15:27 2013
@@ -22,6 +22,8 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.security.auth.login.Configuration;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.IOHelper;
@@ -91,24 +93,34 @@ public class HdfsProducer extends Defaul
 
     @Override
     protected void doStart() throws Exception {
-        super.doStart();
-
-        // setup hdfs if configured to do on startup
-        if (getEndpoint().getConfig().isConnectOnStartup()) {
-            ostream = setupHdfs(true);
-        }
+        // need to remember auth as Hadoop will override that, which otherwise 
means the Auth is broken afterwards
+        Configuration auth = Configuration.getConfiguration();
+        log.trace("Existing JAAS Configuration {}", auth);
+        try {
+            super.doStart();
+
+            // setup hdfs if configured to do on startup
+            if (getEndpoint().getConfig().isConnectOnStartup()) {
+                ostream = setupHdfs(true);
+            }
 
-        SplitStrategy idleStrategy = null;
-        for (SplitStrategy strategy : config.getSplitStrategies()) {
-            if (strategy.type == SplitStrategyType.IDLE) {
-                idleStrategy = strategy;
-                break;
+            SplitStrategy idleStrategy = null;
+            for (SplitStrategy strategy : config.getSplitStrategies()) {
+                if (strategy.type == SplitStrategyType.IDLE) {
+                    idleStrategy = strategy;
+                    break;
+                }
+            }
+            if (idleStrategy != null) {
+                scheduler = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 "HdfsIdleCheck");
+                log.debug("Creating IdleCheck task scheduled to run every {} 
millis", config.getCheckIdleInterval());
+                scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy), 
config.getCheckIdleInterval(), config.getCheckIdleInterval(), 
TimeUnit.MILLISECONDS);
+            }
+        } finally {
+            if (auth != null) {
+                log.trace("Restoring existing JAAS Configuration {}", auth);
+                Configuration.setConfiguration(auth);
             }
-        }
-        if (idleStrategy != null) {
-            scheduler = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 "HdfsIdleCheck");
-            log.debug("Creating IdleCheck task scheduled to run every {} 
millis", config.getCheckIdleInterval());
-            scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy), 
config.getCheckIdleInterval(), config.getCheckIdleInterval(), 
TimeUnit.MILLISECONDS);
         }
     }
 
@@ -159,6 +171,20 @@ public class HdfsProducer extends Defaul
 
     @Override
     public void process(Exchange exchange) throws Exception {
+        // need to remember auth as Hadoop will override that, which otherwise 
means the Auth is broken afterwards
+        Configuration auth = Configuration.getConfiguration();
+        log.trace("Existing JAAS Configuration {}", auth);
+        try {
+            doProcess(exchange);
+        } finally {
+            if (auth != null) {
+                log.trace("Restoring existing JAAS Configuration {}", auth);
+                Configuration.setConfiguration(auth);
+            }
+        }
+    }
+
+    void doProcess(Exchange exchange) throws Exception {
         Object body = exchange.getIn().getBody();
         Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name());
 

Modified: 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
 Thu Mar  7 14:15:27 2013
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.TypeConverter;
+import org.apache.camel.util.IOHelper;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -283,7 +284,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, 
Holder<Integer> size) {
-            InputStream  is = null;
+            InputStream is = null;
             try {
                 is = typeConverter.convertTo(InputStream.class, value);
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -295,13 +296,7 @@ public class HdfsWritableFactories {
             } catch (IOException ex) {
                 throw new RuntimeCamelException(ex);
             } finally {
-                if (is != null) {
-                    try {
-                        is.close();
-                    } catch (IOException e) {
-                        throw new RuntimeException("Error closing stream", e);
-                    }
-                }
+                IOHelper.close(is);
             }
         }
 

Modified: 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
 Thu Mar  7 14:15:27 2013
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.component.hdfs;
 
-public class Holder<T> {
+public final class Holder<T> {
 
     /**
      * The value contained in the holder.


Reply via email to