This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2e31448aab Adds per table and compaction erasure code support (#5743)
2e31448aab is described below

commit 2e31448aabd7012aef86fa21f88d8c94f185ebfc
Author: Keith Turner <[email protected]>
AuthorDate: Fri Aug 1 17:23:06 2025 -0400

    Adds per table and compaction erasure code support (#5743)
    
    Adds Erasure Codes support in Accumulo.  The changes to RFileOperations
    allow EC to be treated similarly to replication.  Two new table
    properties are introduced.  One to enable EC on a per table basis and
    one to set the specific ec policy for the table.  CompressionConfigurer
    was extended with ErasureCodeConfigurer as an example for more
    specialized ec handling.  It will enforce constraints on ec.  File size
    limits can be imposed to encode only larger files.
    
    
    Co-authored-by: cawaring <[email protected]>
    Co-authored-by: Dave Marion <[email protected]>
---
 .../admin/compaction/CompressionConfigurer.java    |  11 +-
 .../admin/compaction/ErasureCodeConfigurer.java    |  98 +++++++
 .../org/apache/accumulo/core/conf/Property.java    |  14 +
 .../apache/accumulo/core/conf/PropertyType.java    |   5 +-
 .../apache/accumulo/core/file/rfile/EcEnabled.java |  23 ++
 .../accumulo/core/file/rfile/RFileOperations.java  |  44 +++-
 .../accumulo/core/conf/PropertyTypeTest.java       |   6 +
 .../compaction/DefaultCompactionPlannerTest.java   |   2 +-
 .../miniclusterImpl/MiniAccumuloClusterImpl.java   |   2 +-
 .../miniclusterImpl/MiniAccumuloConfigImpl.java    |  13 +
 .../org/apache/accumulo/server/util/PropUtil.java  |  26 +-
 .../accumulo/test/compaction/ErasureCodeIT.java    | 282 +++++++++++++++++++++
 12 files changed, 514 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompressionConfigurer.java
 
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompressionConfigurer.java
index b45e2d6369..263d8c4e6f 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompressionConfigurer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompressionConfigurer.java
@@ -66,10 +66,11 @@ public class CompressionConfigurer implements 
CompactionConfigurer {
     if (largeThresh != null && largeCompress != null) {
       this.largeThresh = 
ConfigurationTypeHelper.getFixedMemoryAsBytes(largeThresh);
       this.largeCompress = largeCompress;
-    } else {
-      throw new IllegalArgumentException("Must set both of "
-          + Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey() + " (" + 
LARGE_FILE_COMPRESSION_TYPE
-          + " and " + LARGE_FILE_COMPRESSION_THRESHOLD + ") for " + 
this.getClass().getName());
+    } else if (largeThresh != null ^ largeCompress != null) {
+      throw new IllegalArgumentException(
+          "Must set both of " + 
Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey() + " ("
+              + LARGE_FILE_COMPRESSION_TYPE + " and " + 
LARGE_FILE_COMPRESSION_THRESHOLD
+              + ") or neither for " + this.getClass().getName());
     }
   }
 
@@ -78,7 +79,7 @@ public class CompressionConfigurer implements 
CompactionConfigurer {
     long inputsSum =
         
params.getInputFiles().stream().mapToLong(CompactableFile::getEstimatedSize).sum();
 
-    if (inputsSum > largeThresh) {
+    if (largeThresh != null && inputsSum > largeThresh) {
       return new 
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), largeCompress));
     }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/ErasureCodeConfigurer.java
 
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/ErasureCodeConfigurer.java
new file mode 100644
index 0000000000..a6b31564fb
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/ErasureCodeConfigurer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client.admin.compaction;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * A compaction configurer that extends CompressionConfigurer and adds the 
ability to control and
+ * configure how Erasure Codes work. This plugin accepts the following 
options, when setting these
+ * options on a table prefix them with {@code 
table.compaction.configurer.opts.}.
+ *
+ * <ul>
+ * <li>Set {@value #ERASURE_CODE_SIZE} to a size in bytes. The suffixes 
K,M,and G can be used. This
+ * is the minimum file size to allow conversion to erasure code. If a file is 
below this size it
+ * will be replicated otherwise erasure coding will be enabled.
+ * <li>Set {@value #BYPASS_ERASURE_CODES} to true or false. Setting this to 
true will bypass erasure
+ * codes if the are configured. This options allows initiated compactions to 
bypass logic in this
+ * class if needed.
+ * <li>Optionally set {@value #ERASURE_CODE_POLICY} as the policy to use when 
erasure codes are
+ * used. If this is not set, then it will fall back to what is set on the 
table.</li>
+ * </ul>
+ *
+ * @since 2.1.4
+ */
+public class ErasureCodeConfigurer extends CompressionConfigurer {
+  public static final String ERASURE_CODE_SIZE = 
"erasure.code.size.conversion";
+  public static final String BYPASS_ERASURE_CODES = "erasure.code.bypass";
+  public static final String ERASURE_CODE_POLICY = "erasure.code.policy";
+  private String ecPolicyName = null;
+  private Long ecSize;
+  private Boolean byPassEC = false;
+
+  @Override
+  public void init(InitParameters iparams) {
+    var options = iparams.getOptions();
+
+    this.ecSize =
+        
ConfigurationTypeHelper.getFixedMemoryAsBytes(options.getOrDefault(ERASURE_CODE_SIZE,
 "0"));
+    this.ecPolicyName = options.get(ERASURE_CODE_POLICY);
+    this.byPassEC = 
Boolean.parseBoolean(options.getOrDefault(BYPASS_ERASURE_CODES, "false"));
+
+    if (ecSize == 0 && !byPassEC) {
+      throw new IllegalArgumentException(
+          "Must set either " + ERASURE_CODE_SIZE + " or " + 
BYPASS_ERASURE_CODES);
+    }
+
+    if (!byPassEC) {
+      Preconditions.checkArgument(this.ecSize > 0,
+          "Must set " + ERASURE_CODE_SIZE + " to a positive integer");
+    }
+
+    super.init(iparams);
+  }
+
+  @Override
+  public Overrides override(InputParameters params) {
+    Map<String,String> overs = new 
HashMap<>(super.override(params).getOverrides());
+    if (this.byPassEC) {
+      // Allow for user initiated compactions to pass an options to bypass EC.
+      overs.put(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "disable");
+    } else {
+      long inputsSum =
+          
params.getInputFiles().stream().mapToLong(CompactableFile::getEstimatedSize).sum();
+      if (inputsSum >= this.ecSize) {
+        overs.put(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable");
+        if (ecPolicyName != null) {
+          overs.put(Property.TABLE_ERASURE_CODE_POLICY.getKey(), ecPolicyName);
+        }
+      } else {
+        overs.put(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "disable");
+      }
+    }
+    return new Overrides(overs);
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index dab9ce263f..61addfa80a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1417,6 +1417,20 @@ public enum Property {
           + "constraint.",
       "2.0.0"),
 
+  TABLE_ENABLE_ERASURE_CODES("table.file.ec", "inherit", PropertyType.EC,
+      "This determines if Accumulo will manage erasure codes on a table."
+          + " When setting this to 'enable' must also set erasure.code.policy 
and that policy will "
+          + "always be used regardless of DFS directory settings.  When set to 
'disable', replication "
+          + "will always be used regardless of DFS directory settings.  When 
set to 'inherit' "
+          + "the settings from the directory in dfs will be used. Enabling 
erasure coding on a volume "
+          + "that does not support it is a noop.",
+      "2.1.4"),
+
+  TABLE_ERASURE_CODE_POLICY("table.file.ec.policy", "", PropertyType.STRING,
+      "The name of the erasure code policy to be used.  Policy must be 
available and enabled in hdfs.  "
+          + "To view if policy is enabled check hdfs ec -listPolicies.  This 
setting is only used when "
+          + "table.file.ec is set to enable.",
+      "2.1.4"),
   // VFS ClassLoader properties
 
   // this property shouldn't be used directly; it exists solely to document 
the default value
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java 
b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index eb01f3acfb..be90949aa0 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -160,7 +160,10 @@ public enum PropertyType {
 
   FILENAME_EXT("file name extension", in(true, RFile.EXTENSION),
       "One of the currently supported filename extensions for storing table 
data files. "
-          + "Currently, only " + RFile.EXTENSION + " is supported.");
+          + "Currently, only " + RFile.EXTENSION + " is supported."),
+
+  EC("erasurecode", in(false, "enable", "disable", "inherit"),
+      "One of 'enable','disable','inherit'.");
 
   private final String shortname;
   private final String format;
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/EcEnabled.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/EcEnabled.java
new file mode 100644
index 0000000000..25752d3bd6
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/EcEnabled.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+public enum EcEnabled {
+  ENABLE, DISABLE, INHERIT
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 609c19550e..c426169784 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,10 +137,49 @@ public class RFileOperations extends FileOperations {
       String file = options.getFilename();
       FileSystem fs = options.getFileSystem();
 
-      if (options.dropCacheBehind) {
+      var ecEnable = EcEnabled.valueOf(
+          
options.getTableConfiguration().get(Property.TABLE_ENABLE_ERASURE_CODES).toUpperCase());
+
+      if (fs instanceof DistributedFileSystem) {
+        var builder = ((DistributedFileSystem) fs).createFile(new 
Path(file)).bufferSize(bufferSize)
+            .blockSize(block).overwrite(false);
+
+        if (options.dropCacheBehind) {
+          builder = builder.syncBlock();
+        }
+
+        switch (ecEnable) {
+          case ENABLE:
+            String ecPolicyName =
+                
options.getTableConfiguration().get(Property.TABLE_ERASURE_CODE_POLICY);
+            // The default value of this property is empty string. If empty 
string is given to this
+            // builder it will disable erasure coding. So adding an explicit 
check for that.
+            Preconditions.checkArgument(!ecPolicyName.isBlank(), "Blank or 
empty value set for %s",
+                Property.TABLE_ERASURE_CODE_POLICY.getKey());
+            builder = builder.ecPolicyName(ecPolicyName);
+            break;
+          case DISABLE:
+            // force replication
+            builder = builder.replication((short) rep).replicate();
+            break;
+          case INHERIT:
+            // use the directory settings for replication or EC
+            builder = builder.replication((short) rep);
+            break;
+          default:
+            throw new IllegalStateException(ecEnable.name());
+        }
+
+        outputStream = builder.build();
+      } else if (options.dropCacheBehind) {
         EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, 
CreateFlag.CREATE);
         outputStream = fs.create(new Path(file), FsPermission.getDefault(), 
set, bufferSize,
             (short) rep, block, null);
+      } else {
+        outputStream = fs.create(new Path(file), false, bufferSize, (short) 
rep, block);
+      }
+
+      if (options.dropCacheBehind) {
         try {
           // Tell the DataNode that the file does not need to be cached in the 
OS page cache
           outputStream.setDropBehind(Boolean.TRUE);
@@ -150,8 +190,6 @@ public class RFileOperations extends FileOperations {
           LOG.debug("IOException setting drop behind for file: {}, msg: {}", 
options.filename,
               e.getMessage());
         }
-      } else {
-        outputStream = fs.create(new Path(file), false, bufferSize, (short) 
rep, block);
       }
     }
 
diff --git 
a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java 
b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
index 59085382b3..46e610ea9d 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
@@ -242,4 +242,10 @@ public class PropertyTypeTest extends WithTestNames {
     //
     invalid(null, "", "bzip2", "lzo", "zstd");
   }
+
+  @Test
+  public void testTypeEC() {
+    valid("enable", "ENABLE", "inherit", "INHERIT", "disable", "DISABLE");
+    invalid(null, "policy", "XOR-2-1-1024k");
+  }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 009909f106..04bd7baa09 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -778,7 +778,7 @@ public class DefaultCompactionPlannerTest {
     return createCFs(pairs.toArray(new String[0]));
   }
 
-  private static Set<CompactableFile> createCFs(String... namesSizePairs) {
+  public static Set<CompactableFile> createCFs(String... namesSizePairs) {
     Set<CompactableFile> files = new HashSet<>();
 
     for (int i = 0; i < namesSizePairs.length; i += 2) {
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index c360bf269d..5e04cb0a25 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -199,7 +199,7 @@ public class MiniAccumuloClusterImpl implements 
AccumuloCluster {
       conf.set("dfs.datanode.data.dir.perm", 
MiniDFSUtil.computeDatanodeDirectoryPermission());
       config.getHadoopConfOverrides().forEach((k, v) -> conf.set(k, v));
       String oldTestBuildData = System.setProperty("test.build.data", 
dfs.getAbsolutePath());
-      miniDFS.set(new MiniDFSCluster.Builder(conf).build());
+      miniDFS.set(new 
MiniDFSCluster.Builder(conf).numDataNodes(config.getNumDataNodes()).build());
       if (oldTestBuildData == null) {
         System.clearProperty("test.build.data");
       } else {
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 3cd23e301a..23337a019c 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -58,6 +58,8 @@ import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Holds configuration for {@link MiniAccumuloClusterImpl}. Required 
configurations must be passed
  * to constructor(s) and all other configurations are optional.
@@ -111,6 +113,7 @@ public class MiniAccumuloConfigImpl {
   private Boolean existingInstance = null;
 
   private boolean useMiniDFS = false;
+  private int numMiniDFSDataNodes = 1;
 
   private boolean useCredentialProvider = false;
 
@@ -619,7 +622,17 @@ public class MiniAccumuloConfigImpl {
    * underlying miniDFS cannot be restarted.
    */
   public void useMiniDFS(boolean useMiniDFS) {
+    useMiniDFS(useMiniDFS, 1);
+  }
+
+  public void useMiniDFS(boolean useMiniDFS, int numDataNodes) {
+    Preconditions.checkArgument(numDataNodes > 0);
     this.useMiniDFS = useMiniDFS;
+    this.numMiniDFSDataNodes = numDataNodes;
+  }
+
+  public int getNumDataNodes() {
+    return numMiniDFSDataNodes;
   }
 
   /**
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java 
b/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java
index b49acae248..260e831a9a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.server.util;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
@@ -25,6 +26,8 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.conf.store.PropStoreKey;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
 
 public final class PropUtil {
 
@@ -74,7 +77,28 @@ public final class PropUtil {
               "Unable to resolve classloader for context: " + prop.getValue());
         }
       }
+
+      if (prop.getKey().equals(Property.TABLE_ERASURE_CODE_POLICY.getKey())) {
+        var volumes = context.getVolumeManager().getVolumes();
+        for (var volume : volumes) {
+          if (volume.getFileSystem() instanceof DistributedFileSystem) {
+            Collection<ErasureCodingPolicyInfo> allPolicies = null;
+            try {
+              allPolicies =
+                  ((DistributedFileSystem) 
volume.getFileSystem()).getAllErasureCodingPolicies();
+            } catch (IOException e) {
+              throw new IllegalArgumentException("Failed to check EC policy", 
e);
+            }
+            if (allPolicies.stream().filter(ErasureCodingPolicyInfo::isEnabled)
+                .map(pi -> pi.getPolicy().getName())
+                .noneMatch(policy -> policy.equals(prop.getValue()))) {
+              throw new IllegalArgumentException(
+                  "EC policy " + prop.getKey() + " is not enabled in HDFS for 
volume "
+                      + volume.getFileSystem().getUri() + 
volume.getBasePath());
+            }
+          }
+        }
+      }
     }
   }
-
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java 
b/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java
new file mode 100644
index 0000000000..0a4672060b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.compaction;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.PluginConfig;
+import org.apache.accumulo.core.client.admin.compaction.ErasureCodeConfigurer;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.jupiter.api.Test;
+
+public class ErasureCodeIT extends ConfigurableMacBase {
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setNumTservers(1);
+    cfg.useMiniDFS(true, 5);
+  }
+
+  List<String> getECPolicies(DistributedFileSystem dfs, ClientContext ctx, 
String table)
+      throws Exception {
+    var ample = ctx.getAmple();
+    var tableId = ctx.getTableId(table);
+
+    var policies = new ArrayList<String>();
+
+    try (var tablets =
+        
ample.readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.FILES).build())
 {
+      for (var tabletMeta : tablets) {
+        for (var file : tabletMeta.getFiles()) {
+          var policy = dfs.getErasureCodingPolicy(file.getPath());
+          if (policy != null) {
+            policies.add(policy.getName());
+          } else {
+            policies.add("none");
+          }
+        }
+      }
+    }
+
+    return policies;
+  }
+
+  private Path getTableDir(ClientContext ctx, String table) throws Exception {
+    var ample = ctx.getAmple();
+    var tableId = ctx.getTableId(table);
+
+    try (var tablets =
+        
ample.readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.FILES).build())
 {
+      for (var tabletMeta : tablets) {
+        for (var file : tabletMeta.getFiles()) {
+          // take the tablets first file and use that to get the dir
+          var path = file.getPath().getParent().getParent();
+          // check the assumption of the above code
+          assertTrue(path.toString().endsWith(tableId.canonical()));
+          return path;
+
+        }
+      }
+    }
+
+    throw new IllegalStateException("table " + table + " has no files");
+  }
+
+  @Test
+  public void test() throws Exception {
+    var names = getUniqueNames(3);
+    var table1 = names[0];
+    var table2 = names[1];
+    var table3 = names[2];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
+
+      var policy1 = "XOR-2-1-1024k";
+      var policy2 = "RS-3-2-1024k";
+      var dfs = getCluster().getMiniDfs().getFileSystem();
+      var configuredPolicies = dfs.getAllErasureCodingPolicies().stream()
+          .map(ecpi -> ecpi.getPolicy().getName()).collect(Collectors.toSet());
+      assertTrue(configuredPolicies.contains(policy1));
+      assertTrue(configuredPolicies.contains(policy2));
+      dfs.enableErasureCodingPolicy(policy1);
+      dfs.enableErasureCodingPolicy(policy2);
+
+      var options = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(), 
policy1,
+          Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable");
+      c.tableOperations().create(table1, new 
NewTableConfiguration().setProperties(options));
+
+      var options2 = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(), 
policy1,
+          Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "inherit");
+      c.tableOperations().create(table2, new 
NewTableConfiguration().setProperties(options2));
+
+      var options3 = Map.of(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), 
"disable");
+      c.tableOperations().create(table3, new 
NewTableConfiguration().setProperties(options3));
+
+      SecureRandom random = new SecureRandom();
+
+      try (var writer = c.createMultiTableBatchWriter()) {
+        byte[] bytes = new byte[50_000];
+        Mutation m = new Mutation("xyx");
+        random.nextBytes(bytes);
+        m.at().family("r").qualifier("d").put(bytes);
+        writer.getBatchWriter(table1).addMutation(m);
+        writer.getBatchWriter(table2).addMutation(m);
+        writer.getBatchWriter(table3).addMutation(m);
+
+        m = new Mutation("xyz");
+        random.nextBytes(bytes);
+        m.at().family("r").qualifier("d").put(bytes);
+        writer.getBatchWriter(table1).addMutation(m);
+        writer.getBatchWriter(table2).addMutation(m);
+        writer.getBatchWriter(table3).addMutation(m);
+      }
+      c.tableOperations().flush(table1, null, null, true);
+      c.tableOperations().flush(table2, null, null, true);
+      c.tableOperations().flush(table3, null, null, true);
+
+      var ctx = ((ClientContext) c);
+
+      assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1));
+      assertEquals(List.of("none"), getECPolicies(dfs, ctx, table2));
+      assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3));
+
+      // This should cause the table to compact w/o erasure coding even though 
its configured on the
+      // table
+      var cconfig = new CompactionConfig()
+          .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+              Map.of(ErasureCodeConfigurer.BYPASS_ERASURE_CODES, "true")))
+          .setWait(true);
+      c.tableOperations().compact(table1, cconfig);
+      assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1));
+
+      // table2 does not have erasure coding configured, this should cause it 
to compact w/ erasure
+      // coding because its file should be >10K.
+      cconfig = new CompactionConfig()
+          .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+              Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "10K")))
+          .setWait(true);
+      c.tableOperations().compact(table2, cconfig);
+      assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table2));
+
+      // table2 has a file around 100K in size, it should not use erasure 
coding because its less
+      // than 1M
+      cconfig = new CompactionConfig()
+          .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+              Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "1M")))
+          .setWait(true);
+      c.tableOperations().compact(table2, cconfig);
+      assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1));
+
+      // set a different policy for this compaction than what is configured on 
the table
+      cconfig = new CompactionConfig()
+          .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+              Map.of(ErasureCodeConfigurer.ERASURE_CODE_POLICY, policy2,
+                  ErasureCodeConfigurer.ERASURE_CODE_SIZE, "10K")))
+          .setWait(true);
+      c.tableOperations().compact(table1, cconfig);
+      assertEquals(List.of(policy2), getECPolicies(dfs, ctx, table1));
+
+      // table1 has erasure coding enabled for the table, this should override 
that and disable
+      // erasure coding for the compaction
+      cconfig = new CompactionConfig()
+          .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+              Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "1M")))
+          .setWait(true);
+      c.tableOperations().compact(table1, cconfig);
+      assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1));
+
+      // add new files to the tables
+      try (var writer = c.createMultiTableBatchWriter()) {
+        byte[] bytes = new byte[10_000];
+        random.nextBytes(bytes);
+        Mutation m = new Mutation("xyx");
+        m.at().family("r2").qualifier("d").put(bytes);
+        writer.getBatchWriter(table1).addMutation(m);
+        writer.getBatchWriter(table2).addMutation(m);
+      }
+      c.tableOperations().flush(table1, null, null, true);
+      c.tableOperations().flush(table2, null, null, true);
+
+      assertEquals(List.of("none", policy1), getECPolicies(dfs, ctx, table1));
+      assertEquals(List.of("none", "none"), getECPolicies(dfs, ctx, table2));
+
+      // set the table dir erasure coding policy for all tables
+      dfs.setErasureCodingPolicy(getTableDir(ctx, table1), policy2);
+      dfs.setErasureCodingPolicy(getTableDir(ctx, table2), policy2);
+      dfs.setErasureCodingPolicy(getTableDir(ctx, table3), policy2);
+      // compact all the tables and see how setting an EC policy on the table 
dir influenced the
+      // files created
+      c.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+      c.tableOperations().compact(table2, new 
CompactionConfig().setWait(true));
+      c.tableOperations().compact(table3, new 
CompactionConfig().setWait(true));
+      // the table settings specify policy1 so that should win
+      assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1));
+      // the table settings specify to use the dfs dir settings so that should 
win
+      assertEquals(List.of(policy2), getECPolicies(dfs, ctx, table2));
+      // the table setting specify to use replication so that should win
+      assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3));
+
+      // unset the EC policy on all table dirs
+      dfs.unsetErasureCodingPolicy(getTableDir(ctx, table1));
+      dfs.unsetErasureCodingPolicy(getTableDir(ctx, table2));
+      dfs.unsetErasureCodingPolicy(getTableDir(ctx, table3));
+      // compact all the tables and see what happens
+      c.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+      c.tableOperations().compact(table2, new 
CompactionConfig().setWait(true));
+      c.tableOperations().compact(table3, new 
CompactionConfig().setWait(true));
+      // the table settings specify policy1 so that should win
+      assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1));
+      // the table settings specify to use the dfs dir settings so that should 
win and iit should
+      // replicate
+      assertEquals(List.of("none"), getECPolicies(dfs, ctx, table2));
+      // the table setting specify to use replication and so do the directory 
settings
+      assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3));
+
+      // test configuring an invalid policy and ensure file creation fails
+      dfs.mkdirs(new Path("/tmp"));
+      var badOptions = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(), 
"ycilop",
+          Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable");
+      var exp = assertThrows(RemoteException.class, () -> 
RFile.newWriter().to("/tmp/test1.rf")
+          .withFileSystem(dfs).withTableProperties(badOptions).build());
+      assertTrue(exp.getMessage().contains("ycilop"));
+
+      // Enable erasure coding but do not set a policy. The default value for 
the policy is empty
+      // string, so this should cause a failure.
+      var badOptions2 = Map.of(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), 
"enable");
+      var exp3 = assertThrows(IllegalArgumentException.class, () -> 
RFile.newWriter()
+          
.to("/tmp/test2.rf").withFileSystem(dfs).withTableProperties(badOptions2).build());
+      assertTrue(exp3.getMessage().contains("empty")
+          && 
exp3.getMessage().contains(Property.TABLE_ERASURE_CODE_POLICY.getKey()));
+      // Verify assumption about default value
+      assertEquals("", Property.TABLE_ERASURE_CODE_POLICY.getDefaultValue());
+
+      // try setting invalid EC policy on table, should fail
+      var exp2 = assertThrows(AccumuloException.class, () -> 
c.tableOperations().setProperty(table1,
+          Property.TABLE_ERASURE_CODE_POLICY.getKey(), "ycilop"));
+      assertTrue(exp2.getMessage().contains("ycilop"));
+      assertEquals(policy1, c.tableOperations().getConfiguration(table1)
+          .get(Property.TABLE_ERASURE_CODE_POLICY.getKey()));
+      // should be able to set a valid policy
+      c.tableOperations().setProperty(table1, 
Property.TABLE_ERASURE_CODE_POLICY.getKey(), policy2);
+      assertEquals(policy2, c.tableOperations().getConfiguration(table1)
+          .get(Property.TABLE_ERASURE_CODE_POLICY.getKey()));
+    }
+  }
+}


Reply via email to