This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 8ed7c0154f8 CAMEL-19214: camel-hdfs - Add ShortWritable support to the HDFS component (#9669) 8ed7c0154f8 is described below commit 8ed7c0154f8c4482a4359cc0eb38808d3a279637 Author: Kengo Seki <sek...@apache.org> AuthorDate: Wed Mar 29 12:59:58 2023 +0900 CAMEL-19214: camel-hdfs - Add ShortWritable support to the HDFS component (#9669) The HDFS component supports most of Hadoop Writables corresponding to Java primary types, but it only doesn't support ShortWritable for some reason. This PR adds that support to the HDFS component. --- .../camel-hdfs/src/main/docs/hdfs-component.adoc | 1 + .../camel/component/hdfs/DefaultHdfsFile.java | 3 +++ .../camel/component/hdfs/HdfsWritableFactories.java | 20 ++++++++++++++++++++ .../apache/camel/component/hdfs/WritableType.java | 8 ++++++++ .../camel/component/hdfs/HdfsProducerTest.java | 21 +++++++++++++++++++++ 5 files changed, 53 insertions(+) diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc index 0b1391805b7..7ee2e445340 100644 --- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc +++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc @@ -75,6 +75,7 @@ include::partial$component-endpoint-headers.adoc[] * BYTE for writing a byte, the java Byte class is mapped into a BYTE * BYTES for writing a sequence of bytes. It maps the java ByteBuffer class +* SHORT for writing java short * INT for writing java integer * FLOAT for writing java float * LONG for writing java long diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java index f414e5223e0..2b29253e26e 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java @@ -36,6 +36,7 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -93,6 +94,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme writables.put(ByteBuffer.class, new HdfsWritableFactories.HdfsBytesWritableFactory()); writables.put(Double.class, new HdfsWritableFactories.HdfsDoubleWritableFactory()); writables.put(Float.class, new HdfsWritableFactories.HdfsFloatWritableFactory()); + writables.put(Short.class, new HdfsWritableFactories.HdfsShortWritableFactory()); writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory()); writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory()); writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory()); @@ -105,6 +107,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme readables.put(BytesWritable.class, new HdfsWritableFactories.HdfsBytesWritableFactory()); readables.put(DoubleWritable.class, new HdfsWritableFactories.HdfsDoubleWritableFactory()); readables.put(FloatWritable.class, new HdfsWritableFactories.HdfsFloatWritableFactory()); + readables.put(ShortWritable.class, new HdfsWritableFactories.HdfsShortWritableFactory()); readables.put(IntWritable.class, new HdfsWritableFactories.HdfsIntWritableFactory()); readables.put(LongWritable.class, new HdfsWritableFactories.HdfsLongWritableFactory()); readables.put(Text.class, new HdfsWritableFactories.HdfsTextWritableFactory()); diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java index 7f2de0f4319..a53f0e1d80b 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java @@ -32,6 +32,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -155,6 +156,25 @@ public class HdfsWritableFactories { } } + public static final class HdfsShortWritableFactory implements HdfsWritableFactory { + + private static final int SIZE = 2; + + @Override + public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { + size.setValue(SIZE); + ShortWritable writable = new ShortWritable(); + writable.set(typeConverter.convertTo(Short.class, value)); + return writable; + } + + @Override + public Object read(Writable writable, Holder<Integer> size) { + size.setValue(SIZE); + return ((ShortWritable) writable).get(); + } + } + public static final class HdfsIntWritableFactory implements HdfsWritableFactory { private static final int SIZE = 4; diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/WritableType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/WritableType.java index ccd416e8f8d..35d9c8af017 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/WritableType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/WritableType.java @@ -24,6 +24,7 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; @@ -50,6 +51,13 @@ public enum WritableType { } }, + SHORT { + @Override + public Class<ShortWritable> getWritableClass() { + return ShortWritable.class; + } + }, + INT { @Override public Class<IntWritable> getWritableClass() { diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java index 3b76c091454..1b374e43270 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java @@ -40,6 +40,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; @@ -135,6 +136,23 @@ public class HdfsProducerTest extends HdfsTestSupport { IOHelper.close(reader); } + @Test + public void testWriteShort() throws Exception { + short aShort = 32767; + template.sendBody("direct:write_short", aShort); + + Configuration conf = new Configuration(); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-short"); + SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1)); + Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); + reader.next(key, value); + short rShort = ((ShortWritable) value).get(); + assertEquals(rShort, aShort); + + IOHelper.close(reader); + } + @Test public void testWriteInt() throws Exception { int anInt = 1234; @@ -444,6 +462,9 @@ public class HdfsProducerTest extends HdfsTestSupport { from("direct:write_byte").to("hdfs:localhost/" + TEMP_DIR.toUri() + "/test-camel-byte?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE"); + from("direct:write_short").to("hdfs:localhost/" + TEMP_DIR.toUri() + + "/test-camel-short?fileSystemType=LOCAL&valueType=SHORT&fileType=SEQUENCE_FILE"); + from("direct:write_int").to("hdfs:localhost/" + TEMP_DIR.toUri() + "/test-camel-int?fileSystemType=LOCAL&valueType=INT&fileType=SEQUENCE_FILE");