This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 4e56361501 Computes correct override class and adds unit test for both Hadoop and System property configuration. (#2800) 4e56361501 is described below commit 4e56361501bf0e14016a76ca9e12e880f80b5091 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Jul 8 10:24:11 2022 -0400 Computes correct override class and adds unit test for both Hadoop and System property configuration. (#2800) Closes #2416 Co-authored-by: Damon Brown <dbr...@trietop.com> --- .../file/rfile/bcfile/CompressionAlgorithm.java | 8 +- .../core/file/rfile/bcfile/CompressionTest.java | 40 +++++ .../core/file/rfile/bcfile/DummyCodec.java | 172 +++++++++++++++++++++ 3 files changed, 215 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java index a42034db09..b049b8e5a6 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java @@ -287,19 +287,17 @@ public class CompressionAlgorithm extends Configured { */ private CompressionCodec createNewCodec(final String codecClazzProp, final String defaultClazz, final int bufferSize, final String bufferSizeConfigOpt) { - String extClazz = null; + String clazz = defaultClazz; if (codecClazzProp != null) { - extClazz = - (getConf().get(codecClazzProp) == null ? System.getProperty(codecClazzProp) : null); + clazz = System.getProperty(codecClazzProp, getConf().get(codecClazzProp, defaultClazz)); } - String clazz = (extClazz != null) ? extClazz : defaultClazz; try { LOG.info("Trying to load codec class {}", clazz); Configuration config = new Configuration(getConf()); updateBuffer(config, bufferSizeConfigOpt, bufferSize); return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), config); } catch (ClassNotFoundException e) { - // This is okay. + LOG.debug("Unable to load codec class {} for {}", clazz, codecClazzProp, e); } return null; } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java index 00e5b292b3..8f38b1b30e 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java @@ -41,6 +41,7 @@ import org.apache.accumulo.core.spi.file.rfile.compression.Snappy; import org.apache.accumulo.core.spi.file.rfile.compression.ZStandard; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.util.ReflectionUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -302,4 +303,43 @@ public class CompressionTest { } } + @Test + public void testHadoopCodecOverride() { + Configuration conf = new Configuration(false); + conf.set(new ZStandard().getCodecClassNameProperty(), DummyCodec.class.getName()); + CompressionAlgorithm algo = Compression.getCompressionAlgorithmByName("zstd"); + algo.setConf(conf); + CompressionCodec dummyCodec = algo.createNewCodec(4096); + assertEquals(DummyCodec.class, dummyCodec.getClass(), "Hadoop override DummyCodec not loaded"); + } + + @Test + public void testSystemPropertyCodecOverride() { + System.setProperty(new Lz4().getCodecClassNameProperty(), DummyCodec.class.getName()); + try { + CompressionAlgorithm algo = Compression.getCompressionAlgorithmByName("lz4"); + CompressionCodec dummyCodec = algo.createNewCodec(4096); + assertEquals(DummyCodec.class, dummyCodec.getClass(), + "Hadoop override DummyCodec not loaded"); + } finally { + System.clearProperty(new Lz4().getCodecClassNameProperty()); + } + } + + @Test + public void testSystemPropertyOverridesConf() { + System.setProperty(new Snappy().getCodecClassNameProperty(), DummyCodec.class.getName()); + try { + Configuration conf = new Configuration(false); + conf.set(new Snappy().getCodecClassNameProperty(), SnappyCodec.class.getName()); + CompressionAlgorithm algo = Compression.getCompressionAlgorithmByName("snappy"); + algo.setConf(conf); + CompressionCodec dummyCodec = algo.createNewCodec(4096); + assertEquals(DummyCodec.class, dummyCodec.getClass(), + "Hadoop override DummyCodec not loaded"); + } finally { + System.clearProperty(new Snappy().getCodecClassNameProperty()); + } + } + } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/DummyCodec.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/DummyCodec.java new file mode 100644 index 0000000000..831cc2b97d --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/DummyCodec.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.file.rfile.bcfile; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; + +public class DummyCodec implements Configurable, CompressionCodec { + + Configuration conf; + + /** + * Set the configuration to be used by this object. + * + * @param conf + * the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this object. + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given {@link OutputStream}. + * + * @param out + * the location for the final output stream + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + * unsupported operation + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given {@link OutputStream} with + * the given {@link Compressor}. + * + * @param out + * the location for the final output stream + * @param compressor + * compressor to use + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + * unsupported operation + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) + throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class<? extends Compressor> getCompressorType() { + throw new UnsupportedOperationException(); + } + + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + throw new UnsupportedOperationException(); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given input stream. + * + * @param in + * the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + * unsupported operation + */ + @Override + public CompressionInputStream createInputStream(InputStream in) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given {@link InputStream} with + * the given {@link Decompressor}. + * + * @param in + * the stream to read compressed bytes from + * @param decompressor + * decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + * unsupported operation + */ + @Override + public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) + throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class<? extends Decompressor> getDecompressorType() { + throw new UnsupportedOperationException(); + } + + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + throw new UnsupportedOperationException(); + } + + /** + * Get the default filename extension for this kind of compression. + * + * @return <code>.dummy</code>. + */ + @Override + public String getDefaultExtension() { + return ".dummy"; + } + +}