This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 2e31448aab Adds per table and compaction erasure code support (#5743)
2e31448aab is described below
commit 2e31448aabd7012aef86fa21f88d8c94f185ebfc
Author: Keith Turner <[email protected]>
AuthorDate: Fri Aug 1 17:23:06 2025 -0400
Adds per table and compaction erasure code support (#5743)
Adds Erasure Codes support in Accumulo. The changes to RFileOperations
allow EC to be treated similarly to replication. Two new table
properties are introduced. One to enable EC on a per table basis and
one to set the specific ec policy for the table. CompressionConfigurer
was extended with ErasureCodeConfigurer as an example for more
specialized ec handling. It will enforce constraints on ec. File size
limits can be imposed to encode only larger files.
Co-authored-by: cawaring <[email protected]>
Co-authored-by: Dave Marion <[email protected]>
---
.../admin/compaction/CompressionConfigurer.java | 11 +-
.../admin/compaction/ErasureCodeConfigurer.java | 98 +++++++
.../org/apache/accumulo/core/conf/Property.java | 14 +
.../apache/accumulo/core/conf/PropertyType.java | 5 +-
.../apache/accumulo/core/file/rfile/EcEnabled.java | 23 ++
.../accumulo/core/file/rfile/RFileOperations.java | 44 +++-
.../accumulo/core/conf/PropertyTypeTest.java | 6 +
.../compaction/DefaultCompactionPlannerTest.java | 2 +-
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 2 +-
.../miniclusterImpl/MiniAccumuloConfigImpl.java | 13 +
.../org/apache/accumulo/server/util/PropUtil.java | 26 +-
.../accumulo/test/compaction/ErasureCodeIT.java | 282 +++++++++++++++++++++
12 files changed, 514 insertions(+), 12 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompressionConfigurer.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompressionConfigurer.java
index b45e2d6369..263d8c4e6f 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompressionConfigurer.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompressionConfigurer.java
@@ -66,10 +66,11 @@ public class CompressionConfigurer implements
CompactionConfigurer {
if (largeThresh != null && largeCompress != null) {
this.largeThresh =
ConfigurationTypeHelper.getFixedMemoryAsBytes(largeThresh);
this.largeCompress = largeCompress;
- } else {
- throw new IllegalArgumentException("Must set both of "
- + Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey() + " (" +
LARGE_FILE_COMPRESSION_TYPE
- + " and " + LARGE_FILE_COMPRESSION_THRESHOLD + ") for " +
this.getClass().getName());
+ } else if (largeThresh != null ^ largeCompress != null) {
+ throw new IllegalArgumentException(
+ "Must set both of " +
Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey() + " ("
+ + LARGE_FILE_COMPRESSION_TYPE + " and " +
LARGE_FILE_COMPRESSION_THRESHOLD
+ + ") or neither for " + this.getClass().getName());
}
}
@@ -78,7 +79,7 @@ public class CompressionConfigurer implements
CompactionConfigurer {
long inputsSum =
params.getInputFiles().stream().mapToLong(CompactableFile::getEstimatedSize).sum();
- if (inputsSum > largeThresh) {
+ if (largeThresh != null && inputsSum > largeThresh) {
return new
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), largeCompress));
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/ErasureCodeConfigurer.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/ErasureCodeConfigurer.java
new file mode 100644
index 0000000000..a6b31564fb
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/ErasureCodeConfigurer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client.admin.compaction;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * A compaction configurer that extends CompressionConfigurer and adds the
ability to control and
+ * configure how Erasure Codes work. This plugin accepts the following
options, when setting these
+ * options on a table prefix them with {@code
table.compaction.configurer.opts.}.
+ *
+ * <ul>
+ * <li>Set {@value #ERASURE_CODE_SIZE} to a size in bytes. The suffixes
K,M,and G can be used. This
+ * is the minimum file size to allow conversion to erasure code. If a file is
below this size it
+ * will be replicated otherwise erasure coding will be enabled.
+ * <li>Set {@value #BYPASS_ERASURE_CODES} to true or false. Setting this to
true will bypass erasure
+ * codes if the are configured. This options allows initiated compactions to
bypass logic in this
+ * class if needed.
+ * <li>Optionally set {@value #ERASURE_CODE_POLICY} as the policy to use when
erasure codes are
+ * used. If this is not set, then it will fall back to what is set on the
table.</li>
+ * </ul>
+ *
+ * @since 2.1.4
+ */
+public class ErasureCodeConfigurer extends CompressionConfigurer {
+ public static final String ERASURE_CODE_SIZE =
"erasure.code.size.conversion";
+ public static final String BYPASS_ERASURE_CODES = "erasure.code.bypass";
+ public static final String ERASURE_CODE_POLICY = "erasure.code.policy";
+ private String ecPolicyName = null;
+ private Long ecSize;
+ private Boolean byPassEC = false;
+
+ @Override
+ public void init(InitParameters iparams) {
+ var options = iparams.getOptions();
+
+ this.ecSize =
+
ConfigurationTypeHelper.getFixedMemoryAsBytes(options.getOrDefault(ERASURE_CODE_SIZE,
"0"));
+ this.ecPolicyName = options.get(ERASURE_CODE_POLICY);
+ this.byPassEC =
Boolean.parseBoolean(options.getOrDefault(BYPASS_ERASURE_CODES, "false"));
+
+ if (ecSize == 0 && !byPassEC) {
+ throw new IllegalArgumentException(
+ "Must set either " + ERASURE_CODE_SIZE + " or " +
BYPASS_ERASURE_CODES);
+ }
+
+ if (!byPassEC) {
+ Preconditions.checkArgument(this.ecSize > 0,
+ "Must set " + ERASURE_CODE_SIZE + " to a positive integer");
+ }
+
+ super.init(iparams);
+ }
+
+ @Override
+ public Overrides override(InputParameters params) {
+ Map<String,String> overs = new
HashMap<>(super.override(params).getOverrides());
+ if (this.byPassEC) {
+ // Allow for user initiated compactions to pass an options to bypass EC.
+ overs.put(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "disable");
+ } else {
+ long inputsSum =
+
params.getInputFiles().stream().mapToLong(CompactableFile::getEstimatedSize).sum();
+ if (inputsSum >= this.ecSize) {
+ overs.put(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable");
+ if (ecPolicyName != null) {
+ overs.put(Property.TABLE_ERASURE_CODE_POLICY.getKey(), ecPolicyName);
+ }
+ } else {
+ overs.put(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "disable");
+ }
+ }
+ return new Overrides(overs);
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index dab9ce263f..61addfa80a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1417,6 +1417,20 @@ public enum Property {
+ "constraint.",
"2.0.0"),
+ TABLE_ENABLE_ERASURE_CODES("table.file.ec", "inherit", PropertyType.EC,
+ "This determines if Accumulo will manage erasure codes on a table."
+ + " When setting this to 'enable' must also set erasure.code.policy
and that policy will "
+ + "always be used regardless of DFS directory settings. When set to
'disable', replication "
+ + "will always be used regardless of DFS directory settings. When
set to 'inherit' "
+ + "the settings from the directory in dfs will be used. Enabling
erasure coding on a volume "
+ + "that does not support it is a noop.",
+ "2.1.4"),
+
+ TABLE_ERASURE_CODE_POLICY("table.file.ec.policy", "", PropertyType.STRING,
+ "The name of the erasure code policy to be used. Policy must be
available and enabled in hdfs. "
+ + "To view if policy is enabled check hdfs ec -listPolicies. This
setting is only used when "
+ + "table.file.ec is set to enable.",
+ "2.1.4"),
// VFS ClassLoader properties
// this property shouldn't be used directly; it exists solely to document
the default value
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index eb01f3acfb..be90949aa0 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -160,7 +160,10 @@ public enum PropertyType {
FILENAME_EXT("file name extension", in(true, RFile.EXTENSION),
"One of the currently supported filename extensions for storing table
data files. "
- + "Currently, only " + RFile.EXTENSION + " is supported.");
+ + "Currently, only " + RFile.EXTENSION + " is supported."),
+
+ EC("erasurecode", in(false, "enable", "disable", "inherit"),
+ "One of 'enable','disable','inherit'.");
private final String shortname;
private final String format;
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/rfile/EcEnabled.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/EcEnabled.java
new file mode 100644
index 0000000000..25752d3bd6
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/EcEnabled.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+public enum EcEnabled {
+ ENABLE, DISABLE, INHERIT
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 609c19550e..c426169784 100644
---
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,10 +137,49 @@ public class RFileOperations extends FileOperations {
String file = options.getFilename();
FileSystem fs = options.getFileSystem();
- if (options.dropCacheBehind) {
+ var ecEnable = EcEnabled.valueOf(
+
options.getTableConfiguration().get(Property.TABLE_ENABLE_ERASURE_CODES).toUpperCase());
+
+ if (fs instanceof DistributedFileSystem) {
+ var builder = ((DistributedFileSystem) fs).createFile(new
Path(file)).bufferSize(bufferSize)
+ .blockSize(block).overwrite(false);
+
+ if (options.dropCacheBehind) {
+ builder = builder.syncBlock();
+ }
+
+ switch (ecEnable) {
+ case ENABLE:
+ String ecPolicyName =
+
options.getTableConfiguration().get(Property.TABLE_ERASURE_CODE_POLICY);
+ // The default value of this property is empty string. If empty
string is given to this
+ // builder it will disable erasure coding. So adding an explicit
check for that.
+ Preconditions.checkArgument(!ecPolicyName.isBlank(), "Blank or
empty value set for %s",
+ Property.TABLE_ERASURE_CODE_POLICY.getKey());
+ builder = builder.ecPolicyName(ecPolicyName);
+ break;
+ case DISABLE:
+ // force replication
+ builder = builder.replication((short) rep).replicate();
+ break;
+ case INHERIT:
+ // use the directory settings for replication or EC
+ builder = builder.replication((short) rep);
+ break;
+ default:
+ throw new IllegalStateException(ecEnable.name());
+ }
+
+ outputStream = builder.build();
+ } else if (options.dropCacheBehind) {
EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK,
CreateFlag.CREATE);
outputStream = fs.create(new Path(file), FsPermission.getDefault(),
set, bufferSize,
(short) rep, block, null);
+ } else {
+ outputStream = fs.create(new Path(file), false, bufferSize, (short)
rep, block);
+ }
+
+ if (options.dropCacheBehind) {
try {
// Tell the DataNode that the file does not need to be cached in the
OS page cache
outputStream.setDropBehind(Boolean.TRUE);
@@ -150,8 +190,6 @@ public class RFileOperations extends FileOperations {
LOG.debug("IOException setting drop behind for file: {}, msg: {}",
options.filename,
e.getMessage());
}
- } else {
- outputStream = fs.create(new Path(file), false, bufferSize, (short)
rep, block);
}
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
index 59085382b3..46e610ea9d 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
@@ -242,4 +242,10 @@ public class PropertyTypeTest extends WithTestNames {
//
invalid(null, "", "bzip2", "lzo", "zstd");
}
+
+ @Test
+ public void testTypeEC() {
+ valid("enable", "ENABLE", "inherit", "INHERIT", "disable", "DISABLE");
+ invalid(null, "policy", "XOR-2-1-1024k");
+ }
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 009909f106..04bd7baa09 100644
---
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -778,7 +778,7 @@ public class DefaultCompactionPlannerTest {
return createCFs(pairs.toArray(new String[0]));
}
- private static Set<CompactableFile> createCFs(String... namesSizePairs) {
+ public static Set<CompactableFile> createCFs(String... namesSizePairs) {
Set<CompactableFile> files = new HashSet<>();
for (int i = 0; i < namesSizePairs.length; i += 2) {
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index c360bf269d..5e04cb0a25 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -199,7 +199,7 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
conf.set("dfs.datanode.data.dir.perm",
MiniDFSUtil.computeDatanodeDirectoryPermission());
config.getHadoopConfOverrides().forEach((k, v) -> conf.set(k, v));
String oldTestBuildData = System.setProperty("test.build.data",
dfs.getAbsolutePath());
- miniDFS.set(new MiniDFSCluster.Builder(conf).build());
+ miniDFS.set(new
MiniDFSCluster.Builder(conf).numDataNodes(config.getNumDataNodes()).build());
if (oldTestBuildData == null) {
System.clearProperty("test.build.data");
} else {
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 3cd23e301a..23337a019c 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -58,6 +58,8 @@ import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* Holds configuration for {@link MiniAccumuloClusterImpl}. Required
configurations must be passed
* to constructor(s) and all other configurations are optional.
@@ -111,6 +113,7 @@ public class MiniAccumuloConfigImpl {
private Boolean existingInstance = null;
private boolean useMiniDFS = false;
+ private int numMiniDFSDataNodes = 1;
private boolean useCredentialProvider = false;
@@ -619,7 +622,17 @@ public class MiniAccumuloConfigImpl {
* underlying miniDFS cannot be restarted.
*/
public void useMiniDFS(boolean useMiniDFS) {
+ useMiniDFS(useMiniDFS, 1);
+ }
+
+ public void useMiniDFS(boolean useMiniDFS, int numDataNodes) {
+ Preconditions.checkArgument(numDataNodes > 0);
this.useMiniDFS = useMiniDFS;
+ this.numMiniDFSDataNodes = numDataNodes;
+ }
+
+ public int getNumDataNodes() {
+ return numMiniDFSDataNodes;
}
/**
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java
index b49acae248..260e831a9a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.server.util;
+import java.io.IOException;
import java.util.Collection;
import java.util.Map;
@@ -25,6 +26,8 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.store.PropStoreKey;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
public final class PropUtil {
@@ -74,7 +77,28 @@ public final class PropUtil {
"Unable to resolve classloader for context: " + prop.getValue());
}
}
+
+ if (prop.getKey().equals(Property.TABLE_ERASURE_CODE_POLICY.getKey())) {
+ var volumes = context.getVolumeManager().getVolumes();
+ for (var volume : volumes) {
+ if (volume.getFileSystem() instanceof DistributedFileSystem) {
+ Collection<ErasureCodingPolicyInfo> allPolicies = null;
+ try {
+ allPolicies =
+ ((DistributedFileSystem)
volume.getFileSystem()).getAllErasureCodingPolicies();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to check EC policy",
e);
+ }
+ if (allPolicies.stream().filter(ErasureCodingPolicyInfo::isEnabled)
+ .map(pi -> pi.getPolicy().getName())
+ .noneMatch(policy -> policy.equals(prop.getValue()))) {
+ throw new IllegalArgumentException(
+ "EC policy " + prop.getKey() + " is not enabled in HDFS for
volume "
+ + volume.getFileSystem().getUri() +
volume.getBasePath());
+ }
+ }
+ }
+ }
}
}
-
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java
new file mode 100644
index 0000000000..0a4672060b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.compaction;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.PluginConfig;
+import org.apache.accumulo.core.client.admin.compaction.ErasureCodeConfigurer;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.jupiter.api.Test;
+
+public class ErasureCodeIT extends ConfigurableMacBase {
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ cfg.useMiniDFS(true, 5);
+ }
+
+ List<String> getECPolicies(DistributedFileSystem dfs, ClientContext ctx,
String table)
+ throws Exception {
+ var ample = ctx.getAmple();
+ var tableId = ctx.getTableId(table);
+
+ var policies = new ArrayList<String>();
+
+ try (var tablets =
+
ample.readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.FILES).build())
{
+ for (var tabletMeta : tablets) {
+ for (var file : tabletMeta.getFiles()) {
+ var policy = dfs.getErasureCodingPolicy(file.getPath());
+ if (policy != null) {
+ policies.add(policy.getName());
+ } else {
+ policies.add("none");
+ }
+ }
+ }
+ }
+
+ return policies;
+ }
+
+ private Path getTableDir(ClientContext ctx, String table) throws Exception {
+ var ample = ctx.getAmple();
+ var tableId = ctx.getTableId(table);
+
+ try (var tablets =
+
ample.readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.FILES).build())
{
+ for (var tabletMeta : tablets) {
+ for (var file : tabletMeta.getFiles()) {
+ // take the tablets first file and use that to get the dir
+ var path = file.getPath().getParent().getParent();
+ // check the assumption of the above code
+ assertTrue(path.toString().endsWith(tableId.canonical()));
+ return path;
+
+ }
+ }
+ }
+
+ throw new IllegalStateException("table " + table + " has no files");
+ }
+
+ @Test
+ public void test() throws Exception {
+ var names = getUniqueNames(3);
+ var table1 = names[0];
+ var table2 = names[1];
+ var table3 = names[2];
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
+
+ var policy1 = "XOR-2-1-1024k";
+ var policy2 = "RS-3-2-1024k";
+ var dfs = getCluster().getMiniDfs().getFileSystem();
+ var configuredPolicies = dfs.getAllErasureCodingPolicies().stream()
+ .map(ecpi -> ecpi.getPolicy().getName()).collect(Collectors.toSet());
+ assertTrue(configuredPolicies.contains(policy1));
+ assertTrue(configuredPolicies.contains(policy2));
+ dfs.enableErasureCodingPolicy(policy1);
+ dfs.enableErasureCodingPolicy(policy2);
+
+ var options = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(),
policy1,
+ Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable");
+ c.tableOperations().create(table1, new
NewTableConfiguration().setProperties(options));
+
+ var options2 = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(),
policy1,
+ Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "inherit");
+ c.tableOperations().create(table2, new
NewTableConfiguration().setProperties(options2));
+
+ var options3 = Map.of(Property.TABLE_ENABLE_ERASURE_CODES.getKey(),
"disable");
+ c.tableOperations().create(table3, new
NewTableConfiguration().setProperties(options3));
+
+ SecureRandom random = new SecureRandom();
+
+ try (var writer = c.createMultiTableBatchWriter()) {
+ byte[] bytes = new byte[50_000];
+ Mutation m = new Mutation("xyx");
+ random.nextBytes(bytes);
+ m.at().family("r").qualifier("d").put(bytes);
+ writer.getBatchWriter(table1).addMutation(m);
+ writer.getBatchWriter(table2).addMutation(m);
+ writer.getBatchWriter(table3).addMutation(m);
+
+ m = new Mutation("xyz");
+ random.nextBytes(bytes);
+ m.at().family("r").qualifier("d").put(bytes);
+ writer.getBatchWriter(table1).addMutation(m);
+ writer.getBatchWriter(table2).addMutation(m);
+ writer.getBatchWriter(table3).addMutation(m);
+ }
+ c.tableOperations().flush(table1, null, null, true);
+ c.tableOperations().flush(table2, null, null, true);
+ c.tableOperations().flush(table3, null, null, true);
+
+ var ctx = ((ClientContext) c);
+
+ assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1));
+ assertEquals(List.of("none"), getECPolicies(dfs, ctx, table2));
+ assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3));
+
+ // This should cause the table to compact w/o erasure coding even though
its configured on the
+ // table
+ var cconfig = new CompactionConfig()
+ .setConfigurer(new
PluginConfig(ErasureCodeConfigurer.class.getName(),
+ Map.of(ErasureCodeConfigurer.BYPASS_ERASURE_CODES, "true")))
+ .setWait(true);
+ c.tableOperations().compact(table1, cconfig);
+ assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1));
+
+ // table2 does not have erasure coding configured, this should cause it
to compact w/ erasure
+ // coding because its file should be >10K.
+ cconfig = new CompactionConfig()
+ .setConfigurer(new
PluginConfig(ErasureCodeConfigurer.class.getName(),
+ Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "10K")))
+ .setWait(true);
+ c.tableOperations().compact(table2, cconfig);
+ assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table2));
+
+ // table2 has a file around 100K in size, it should not use erasure
coding because its less
+ // than 1M
+ cconfig = new CompactionConfig()
+ .setConfigurer(new
PluginConfig(ErasureCodeConfigurer.class.getName(),
+ Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "1M")))
+ .setWait(true);
+ c.tableOperations().compact(table2, cconfig);
+ assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1));
+
+ // set a different policy for this compaction than what is configured on
the table
+ cconfig = new CompactionConfig()
+ .setConfigurer(new
PluginConfig(ErasureCodeConfigurer.class.getName(),
+ Map.of(ErasureCodeConfigurer.ERASURE_CODE_POLICY, policy2,
+ ErasureCodeConfigurer.ERASURE_CODE_SIZE, "10K")))
+ .setWait(true);
+ c.tableOperations().compact(table1, cconfig);
+ assertEquals(List.of(policy2), getECPolicies(dfs, ctx, table1));
+
+ // table1 has erasure coding enabled for the table, this should override
that and disable
+ // erasure coding for the compaction
+ cconfig = new CompactionConfig()
+ .setConfigurer(new
PluginConfig(ErasureCodeConfigurer.class.getName(),
+ Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "1M")))
+ .setWait(true);
+ c.tableOperations().compact(table1, cconfig);
+ assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1));
+
+ // add new files to the tables
+ try (var writer = c.createMultiTableBatchWriter()) {
+ byte[] bytes = new byte[10_000];
+ random.nextBytes(bytes);
+ Mutation m = new Mutation("xyx");
+ m.at().family("r2").qualifier("d").put(bytes);
+ writer.getBatchWriter(table1).addMutation(m);
+ writer.getBatchWriter(table2).addMutation(m);
+ }
+ c.tableOperations().flush(table1, null, null, true);
+ c.tableOperations().flush(table2, null, null, true);
+
+ assertEquals(List.of("none", policy1), getECPolicies(dfs, ctx, table1));
+ assertEquals(List.of("none", "none"), getECPolicies(dfs, ctx, table2));
+
+ // set the table dir erasure coding policy for all tables
+ dfs.setErasureCodingPolicy(getTableDir(ctx, table1), policy2);
+ dfs.setErasureCodingPolicy(getTableDir(ctx, table2), policy2);
+ dfs.setErasureCodingPolicy(getTableDir(ctx, table3), policy2);
+ // compact all the tables and see how setting an EC policy on the table
dir influenced the
+ // files created
+ c.tableOperations().compact(table1, new
CompactionConfig().setWait(true));
+ c.tableOperations().compact(table2, new
CompactionConfig().setWait(true));
+ c.tableOperations().compact(table3, new
CompactionConfig().setWait(true));
+ // the table settings specify policy1 so that should win
+ assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1));
+ // the table settings specify to use the dfs dir settings so that should
win
+ assertEquals(List.of(policy2), getECPolicies(dfs, ctx, table2));
+ // the table setting specify to use replication so that should win
+ assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3));
+
+ // unset the EC policy on all table dirs
+ dfs.unsetErasureCodingPolicy(getTableDir(ctx, table1));
+ dfs.unsetErasureCodingPolicy(getTableDir(ctx, table2));
+ dfs.unsetErasureCodingPolicy(getTableDir(ctx, table3));
+ // compact all the tables and see what happens
+ c.tableOperations().compact(table1, new
CompactionConfig().setWait(true));
+ c.tableOperations().compact(table2, new
CompactionConfig().setWait(true));
+ c.tableOperations().compact(table3, new
CompactionConfig().setWait(true));
+ // the table settings specify policy1 so that should win
+ assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1));
+ // the table settings specify to use the dfs dir settings so that should
win and iit should
+ // replicate
+ assertEquals(List.of("none"), getECPolicies(dfs, ctx, table2));
+ // the table setting specify to use replication and so do the directory
settings
+ assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3));
+
+ // test configuring an invalid policy and ensure file creation fails
+ dfs.mkdirs(new Path("/tmp"));
+ var badOptions = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(),
"ycilop",
+ Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable");
+ var exp = assertThrows(RemoteException.class, () ->
RFile.newWriter().to("/tmp/test1.rf")
+ .withFileSystem(dfs).withTableProperties(badOptions).build());
+ assertTrue(exp.getMessage().contains("ycilop"));
+
+ // Enable erasure coding but do not set a policy. The default value for
the policy is empty
+ // string, so this should cause a failure.
+ var badOptions2 = Map.of(Property.TABLE_ENABLE_ERASURE_CODES.getKey(),
"enable");
+ var exp3 = assertThrows(IllegalArgumentException.class, () ->
RFile.newWriter()
+
.to("/tmp/test2.rf").withFileSystem(dfs).withTableProperties(badOptions2).build());
+ assertTrue(exp3.getMessage().contains("empty")
+ &&
exp3.getMessage().contains(Property.TABLE_ERASURE_CODE_POLICY.getKey()));
+ // Verify assumption about default value
+ assertEquals("", Property.TABLE_ERASURE_CODE_POLICY.getDefaultValue());
+
+ // try setting invalid EC policy on table, should fail
+ var exp2 = assertThrows(AccumuloException.class, () ->
c.tableOperations().setProperty(table1,
+ Property.TABLE_ERASURE_CODE_POLICY.getKey(), "ycilop"));
+ assertTrue(exp2.getMessage().contains("ycilop"));
+ assertEquals(policy1, c.tableOperations().getConfiguration(table1)
+ .get(Property.TABLE_ERASURE_CODE_POLICY.getKey()));
+ // should be able to set a valid policy
+ c.tableOperations().setProperty(table1,
Property.TABLE_ERASURE_CODE_POLICY.getKey(), policy2);
+ assertEquals(policy2, c.tableOperations().getConfiguration(table1)
+ .get(Property.TABLE_ERASURE_CODE_POLICY.getKey()));
+ }
+ }
+}