Author: davsclaus
Date: Thu Mar 7 14:15:27 2013
New Revision: 1453882
URL: http://svn.apache.org/r1453882
Log:
CAMEL-6142: camel-hdfs - Preserve auth as Hadoop overrides that and cause
sideeffects for auth afterwards
Added:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
- copied unchanged from r1453879,
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
Modified:
camel/branches/camel-2.10.x/ (props changed)
camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1453879
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/pom.xml Thu Mar 7
14:15:27 2013
@@ -133,6 +133,11 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
Modified:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
Thu Mar 7 14:15:27 2013
@@ -20,6 +20,8 @@ import java.io.IOException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.security.auth.login.Configuration;
+
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
@@ -76,7 +78,7 @@ public final class HdfsConsumer extends
}
// hadoop will cache the connection by default so its faster to get in
the poll method
- HdfsInfo answer = new HdfsInfo(this.hdfsPath.toString());
+ HdfsInfo answer =
HdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString());
if (onStartup) {
log.info("Connected to hdfs file-system {}:{}/{}", new
Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
@@ -90,6 +92,20 @@ public final class HdfsConsumer extends
@Override
protected int poll() throws Exception {
+ // need to remember auth as Hadoop will override that, which otherwise
means the Auth is broken afterwards
+ Configuration auth = Configuration.getConfiguration();
+ log.trace("Existing JAAS Configuration {}", auth);
+ try {
+ return doPoll();
+ } finally {
+ if (auth != null) {
+ log.trace("Restoring existing JAAS Configuration {}", auth);
+ Configuration.setConfiguration(auth);
+ }
+ }
+ }
+
+ protected int doPoll() throws Exception {
class ExcludePathFilter implements PathFilter {
public boolean accept(Path path) {
return !(path.toString().endsWith(config.getOpenedSuffix()) ||
path.toString().endsWith(config.getReadSuffix()));
Modified:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
Thu Mar 7 14:15:27 2013
@@ -96,7 +96,7 @@ public enum HdfsFileType {
public Closeable createOutputStream(String hdfsPath, HdfsConfiguration
configuration) {
try {
Closeable rout;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
if (!configuration.isAppend()) {
rout = hdfsInfo.getFileSystem().create(hdfsInfo.getPath(),
configuration.isOverwrite(), configuration.getBufferSize(),
configuration.getReplication(),
configuration.getBlockSize(), new Progressable() {
@@ -122,7 +122,7 @@ public enum HdfsFileType {
try {
Closeable rin;
if
(configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
} else {
rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath,
configuration));
@@ -142,7 +142,7 @@ public enum HdfsFileType {
outputDest.delete();
}
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
FileSystem fileSystem = hdfsInfo.getFileSystem();
FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest,
false, fileSystem.getConf());
try {
@@ -204,7 +204,7 @@ public enum HdfsFileType {
public Closeable createOutputStream(String hdfsPath, HdfsConfiguration
configuration) {
try {
Closeable rout;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
Class<?> keyWritableClass =
configuration.getKeyType().getWritableClass();
Class<?> valueWritableClass =
configuration.getValueType().getWritableClass();
rout = SequenceFile.createWriter(hdfsInfo.getFileSystem(),
hdfsInfo.getConf(), hdfsInfo.getPath(), keyWritableClass,
@@ -224,7 +224,7 @@ public enum HdfsFileType {
public Closeable createInputStream(String hdfsPath, HdfsConfiguration
configuration) {
try {
Closeable rin;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
rin = new SequenceFile.Reader(hdfsInfo.getFileSystem(),
hdfsInfo.getPath(), hdfsInfo.getConf());
return rin;
} catch (IOException ex) {
@@ -273,7 +273,7 @@ public enum HdfsFileType {
public Closeable createOutputStream(String hdfsPath, HdfsConfiguration
configuration) {
try {
Closeable rout;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
Class<? extends WritableComparable> keyWritableClass =
configuration.getKeyType().getWritableClass();
Class<? extends WritableComparable> valueWritableClass =
configuration.getValueType().getWritableClass();
rout = new MapFile.Writer(hdfsInfo.getConf(),
hdfsInfo.getFileSystem(), hdfsPath, keyWritableClass, valueWritableClass,
@@ -292,7 +292,7 @@ public enum HdfsFileType {
public Closeable createInputStream(String hdfsPath, HdfsConfiguration
configuration) {
try {
Closeable rin;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
rin = new MapFile.Reader(hdfsInfo.getFileSystem(), hdfsPath,
hdfsInfo.getConf());
return rin;
} catch (IOException ex) {
@@ -341,7 +341,7 @@ public enum HdfsFileType {
public Closeable createOutputStream(String hdfsPath, HdfsConfiguration
configuration) {
try {
Closeable rout;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
Class<? extends WritableComparable> keyWritableClass =
configuration.getKeyType().getWritableClass();
Class<? extends WritableComparable> valueWritableClass =
configuration.getValueType().getWritableClass();
rout = new BloomMapFile.Writer(hdfsInfo.getConf(),
hdfsInfo.getFileSystem(), hdfsPath, keyWritableClass, valueWritableClass,
@@ -360,7 +360,7 @@ public enum HdfsFileType {
public Closeable createInputStream(String hdfsPath, HdfsConfiguration
configuration) {
try {
Closeable rin;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
rin = new BloomMapFile.Reader(hdfsInfo.getFileSystem(),
hdfsPath, hdfsInfo.getConf());
return rin;
} catch (IOException ex) {
@@ -404,7 +404,7 @@ public enum HdfsFileType {
public Closeable createOutputStream(String hdfsPath, HdfsConfiguration
configuration) {
try {
Closeable rout;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
Class<? extends WritableComparable> valueWritableClass =
configuration.getValueType().getWritableClass();
rout = new ArrayFile.Writer(hdfsInfo.getConf(),
hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
configuration.getCompressionType(), new Progressable()
{
@@ -422,7 +422,7 @@ public enum HdfsFileType {
public Closeable createInputStream(String hdfsPath, HdfsConfiguration
configuration) {
try {
Closeable rin;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath,
hdfsInfo.getConf());
return rin;
} catch (IOException ex) {
Modified:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
Thu Mar 7 14:15:27 2013
@@ -23,13 +23,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-public class HdfsInfo {
+public final class HdfsInfo {
private Configuration conf;
private FileSystem fileSystem;
private Path path;
- public HdfsInfo(String hdfsPath) throws IOException {
+ HdfsInfo(String hdfsPath) throws IOException {
this.conf = new Configuration();
// this will connect to the hadoop hdfs file system, and in case of no
connection
// then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
@@ -37,15 +37,15 @@ public class HdfsInfo {
this.path = new Path(hdfsPath);
}
- public final Configuration getConf() {
+ public Configuration getConf() {
return conf;
}
- public final FileSystem getFileSystem() {
+ public FileSystem getFileSystem() {
return fileSystem;
}
- public final Path getPath() {
+ public Path getPath() {
return path;
}
}
Modified:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
Thu Mar 7 14:15:27 2013
@@ -19,12 +19,16 @@ package org.apache.camel.component.hdfs;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.login.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HdfsInputStream implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(HdfsInputStream.class);
private HdfsFileType fileType;
private String actualPath;
private String suffixedPath;
@@ -43,7 +47,7 @@ public class HdfsInputStream implements
ret.actualPath = hdfsPath;
ret.suffixedPath = ret.actualPath + '.' +
configuration.getOpenedSuffix();
ret.chunkSize = configuration.getChunkSize();
- HdfsInfo info = new HdfsInfo(ret.actualPath);
+ HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
info.getFileSystem().rename(new Path(ret.actualPath), new
Path(ret.suffixedPath));
ret.in = ret.fileType.createInputStream(ret.suffixedPath,
configuration);
ret.opened = true;
@@ -54,7 +58,7 @@ public class HdfsInputStream implements
public final void close() throws IOException {
if (opened) {
IOUtils.closeStream(in);
- HdfsInfo info = new HdfsInfo(actualPath);
+ HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath);
info.getFileSystem().rename(new Path(suffixedPath), new
Path(actualPath + '.' + HdfsConstants.DEFAULT_READ_SUFFIX));
opened = false;
}
Modified:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
Thu Mar 7 14:15:27 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.io.IOUtils;
public class HdfsOutputStream implements Closeable {
private HdfsFileType fileType;
+ private HdfsInfo info;
private String actualPath;
private String suffixedPath;
private Closeable out;
@@ -45,21 +46,21 @@ public class HdfsOutputStream implements
HdfsOutputStream ret = new HdfsOutputStream();
ret.fileType = configuration.getFileType();
ret.actualPath = hdfsPath;
- HdfsInfo info = new HdfsInfo(ret.actualPath);
+ ret.info = new HdfsInfo(ret.actualPath);
ret.suffixedPath = ret.actualPath + '.' +
configuration.getOpenedSuffix();
if (configuration.isWantAppend() || configuration.isAppend()) {
- if (!info.getFileSystem().exists(new Path(ret.actualPath))) {
+ if (!ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
configuration.setAppend(false);
} else {
configuration.setAppend(true);
- info = new HdfsInfo(ret.suffixedPath);
- info.getFileSystem().rename(new Path(ret.actualPath), new
Path(ret.suffixedPath));
+ ret.info = new HdfsInfo(ret.suffixedPath);
+ ret.info.getFileSystem().rename(new Path(ret.actualPath), new
Path(ret.suffixedPath));
}
} else {
- if (info.getFileSystem().exists(new Path(ret.actualPath))) {
+ if (ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
if (configuration.isOverwrite()) {
- info.getFileSystem().delete(new Path(ret.actualPath),
true);
+ ret.info.getFileSystem().delete(new Path(ret.actualPath),
true);
} else {
throw new RuntimeCamelException("The file already exists");
}
@@ -74,7 +75,6 @@ public class HdfsOutputStream implements
public void close() throws IOException {
if (opened) {
IOUtils.closeStream(out);
- HdfsInfo info = new HdfsInfo(actualPath);
info.getFileSystem().rename(new Path(suffixedPath), new
Path(actualPath));
opened = false;
}
Modified:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
Thu Mar 7 14:15:27 2013
@@ -22,6 +22,8 @@ import java.util.concurrent.ScheduledExe
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.security.auth.login.Configuration;
+
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.IOHelper;
@@ -91,24 +93,34 @@ public class HdfsProducer extends Defaul
@Override
protected void doStart() throws Exception {
- super.doStart();
-
- // setup hdfs if configured to do on startup
- if (getEndpoint().getConfig().isConnectOnStartup()) {
- ostream = setupHdfs(true);
- }
+ // need to remember auth as Hadoop will override that, which otherwise
means the Auth is broken afterwards
+ Configuration auth = Configuration.getConfiguration();
+ log.trace("Existing JAAS Configuration {}", auth);
+ try {
+ super.doStart();
+
+ // setup hdfs if configured to do on startup
+ if (getEndpoint().getConfig().isConnectOnStartup()) {
+ ostream = setupHdfs(true);
+ }
- SplitStrategy idleStrategy = null;
- for (SplitStrategy strategy : config.getSplitStrategies()) {
- if (strategy.type == SplitStrategyType.IDLE) {
- idleStrategy = strategy;
- break;
+ SplitStrategy idleStrategy = null;
+ for (SplitStrategy strategy : config.getSplitStrategies()) {
+ if (strategy.type == SplitStrategyType.IDLE) {
+ idleStrategy = strategy;
+ break;
+ }
+ }
+ if (idleStrategy != null) {
+ scheduler =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
"HdfsIdleCheck");
+ log.debug("Creating IdleCheck task scheduled to run every {}
millis", config.getCheckIdleInterval());
+ scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy),
config.getCheckIdleInterval(), config.getCheckIdleInterval(),
TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ if (auth != null) {
+ log.trace("Restoring existing JAAS Configuration {}", auth);
+ Configuration.setConfiguration(auth);
}
- }
- if (idleStrategy != null) {
- scheduler =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
"HdfsIdleCheck");
- log.debug("Creating IdleCheck task scheduled to run every {}
millis", config.getCheckIdleInterval());
- scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy),
config.getCheckIdleInterval(), config.getCheckIdleInterval(),
TimeUnit.MILLISECONDS);
}
}
@@ -159,6 +171,20 @@ public class HdfsProducer extends Defaul
@Override
public void process(Exchange exchange) throws Exception {
+ // need to remember auth as Hadoop will override that, which otherwise
means the Auth is broken afterwards
+ Configuration auth = Configuration.getConfiguration();
+ log.trace("Existing JAAS Configuration {}", auth);
+ try {
+ doProcess(exchange);
+ } finally {
+ if (auth != null) {
+ log.trace("Restoring existing JAAS Configuration {}", auth);
+ Configuration.setConfiguration(auth);
+ }
+ }
+ }
+
+ void doProcess(Exchange exchange) throws Exception {
Object body = exchange.getIn().getBody();
Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name());
Modified:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
Thu Mar 7 14:15:27 2013
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConverter;
+import org.apache.camel.util.IOHelper;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
@@ -283,7 +284,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter,
Holder<Integer> size) {
- InputStream is = null;
+ InputStream is = null;
try {
is = typeConverter.convertTo(InputStream.class, value);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -295,13 +296,7 @@ public class HdfsWritableFactories {
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
} finally {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- throw new RuntimeException("Error closing stream", e);
- }
- }
+ IOHelper.close(is);
}
}
Modified:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java?rev=1453882&r1=1453881&r2=1453882&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
Thu Mar 7 14:15:27 2013
@@ -16,7 +16,7 @@
*/
package org.apache.camel.component.hdfs;
-public class Holder<T> {
+public final class Holder<T> {
/**
* The value contained in the holder.