This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit c7205cd1b972fa51aadce953e8bcc3d0883eca9c
Merge: e7b3c6680b 2e31448aab
Author: Keith Turner <[email protected]>
AuthorDate: Fri Aug 1 23:04:46 2025 +0000

    Merge commit '2e31448aabd7012aef86fa21f88d8c94f185ebfc'

 .../admin/compaction/CompressionConfigurer.java    |  11 +-
 .../admin/compaction/ErasureCodeConfigurer.java    |  98 +++++++
 .../org/apache/accumulo/core/conf/Property.java    |  14 +
 .../apache/accumulo/core/conf/PropertyType.java    |   4 +-
 .../apache/accumulo/core/file/rfile/EcEnabled.java |  23 ++
 .../accumulo/core/file/rfile/RFileOperations.java  |  44 +++-
 .../accumulo/core/conf/PropertyTypeTest.java       |   5 +
 .../miniclusterImpl/MiniAccumuloClusterImpl.java   |   2 +-
 .../miniclusterImpl/MiniAccumuloConfigImpl.java    |  13 +
 .../org/apache/accumulo/server/util/PropUtil.java  |  26 +-
 .../accumulo/test/compaction/ErasureCodeIT.java    | 282 +++++++++++++++++++++
 11 files changed, 511 insertions(+), 11 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 35a98bafa2,61addfa80a..b63b6a0d9f
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -1226,7 -1416,139 +1226,21 @@@ public enum Property 
            + "also consider configuring the `" + 
NoDeleteConstraint.class.getName() + "` "
            + "constraint.",
        "2.0.0"),
 -
+   TABLE_ENABLE_ERASURE_CODES("table.file.ec", "inherit", PropertyType.EC,
+       "This determines if Accumulo will manage erasure codes on a table."
+           + " When setting this to 'enable' must also set erasure.code.policy 
and that policy will "
+           + "always be used regardless of DFS directory settings.  When set 
to 'disable', replication "
+           + "will always be used regardless of DFS directory settings.  When 
set to 'inherit' "
+           + "the settings from the directory in dfs will be used. Enabling 
erasure coding on a volume "
+           + "that does not support it is a noop.",
+       "2.1.4"),
+ 
+   TABLE_ERASURE_CODE_POLICY("table.file.ec.policy", "", PropertyType.STRING,
+       "The name of the erasure code policy to be used.  Policy must be 
available and enabled in hdfs.  "
+           + "To view if policy is enabled check hdfs ec -listPolicies.  This 
setting is only used when "
+           + "table.file.ec is set to enable.",
+       "2.1.4"),
 -  // VFS ClassLoader properties
 -
 -  // this property shouldn't be used directly; it exists solely to document 
the default value
 -  // defined by its use in AccumuloVFSClassLoader when generating the 
property documentation
 -  @Deprecated(since = "2.1.0", forRemoval = true)
 -  VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY(
 -      
org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY,
 -      "", PropertyType.STRING,
 -      "Configuration for a system level vfs classloader. Accumulo jar can be"
 -          + " configured here and loaded out of HDFS.",
 -      "1.5.0"),
 -  @Deprecated(since = "2.1.0", forRemoval = true)
 -  VFS_CONTEXT_CLASSPATH_PROPERTY(
 -      
org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.VFS_CONTEXT_CLASSPATH_PROPERTY,
 -      null, PropertyType.PREFIX,
 -      "Properties in this category are define a classpath. These properties"
 -          + " start  with the category prefix, followed by a context name. 
The value is"
 -          + " a comma separated list of URIs. Supports full regex on filename 
alone."
 -          + " For example, 
general.vfs.context.classpath.cx1=hdfs://nn1:9902/mylibdir/*.jar."
 -          + " You can enable post delegation for a context, which will load 
classes from the"
 -          + " context first instead of the parent first. Do this by setting"
 -          + " `general.vfs.context.classpath.<name>.delegation=post`, where 
`<name>` is"
 -          + " your context name. If delegation is not specified, it defaults 
to loading"
 -          + " from parent classloader first.",
 -      "1.5.0"),
 -
 -  // this property shouldn't be used directly; it exists solely to document 
the default value
 -  // defined by its use in AccumuloVFSClassLoader when generating the 
property documentation
 -  @Deprecated(since = "2.1.0", forRemoval = true)
 -  VFS_CLASSLOADER_CACHE_DIR(
 -      
org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.VFS_CACHE_DIR,
 -      "${java.io.tmpdir}", PropertyType.ABSOLUTEPATH,
 -      "The base directory to use for the vfs cache. The actual cached files 
will be located"
 -          + " in a subdirectory, 
`accumulo-vfs-cache-<jvmProcessName>-${user.name}`, where"
 -          + " `<jvmProcessName>` is determined by the JVM's internal 
management engine."
 -          + " The cache will keep a soft reference to all of the classes 
loaded in the VM."
 -          + " This should be on local disk on each node with sufficient 
space.",
 -      "1.5.0"),
 -
 -  // General properties for configuring replication
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_PREFIX("replication.", null, PropertyType.PREFIX,
 -      "Properties in this category affect the replication of data to other 
Accumulo instances.",
 -      "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_PEERS("replication.peer.", null, PropertyType.PREFIX,
 -      "Properties in this category control what systems data can be 
replicated to.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_PEER_USER("replication.peer.user.", null, PropertyType.PREFIX,
 -      "The username to provide when authenticating with the given peer.", 
"1.7.0"),
 -  @Sensitive
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_PEER_PASSWORD("replication.peer.password.", null, 
PropertyType.PREFIX,
 -      "The password to provide when authenticating with the given peer.", 
"1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_PEER_KEYTAB("replication.peer.keytab.", null, 
PropertyType.PREFIX,
 -      "The keytab to use when authenticating with the given peer.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_NAME("replication.name", "", PropertyType.STRING,
 -      "Name of this cluster with respect to replication. Used to identify 
this"
 -          + " instance from other peers.",
 -      "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "1000", 
PropertyType.COUNT,
 -      "Upper bound of the number of files queued for replication.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", 
"30s",
 -      PropertyType.TIMEDURATION, "Amount of time to sleep between replication 
work assignment.",
 -      "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_WORKER_THREADS("replication.worker.threads", "4", 
PropertyType.COUNT,
 -      "Size of the threadpool that each tabletserver devotes to replicating 
data.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", 
"10002", PropertyType.PORT,
 -      "Listen port used by thrift service in tserver listening for 
replication.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", 
PropertyType.COUNT,
 -      "Number of attempts to try to replicate some data before giving up and"
 -          + " letting it naturally be retried later.",
 -      "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", 
PropertyType.COUNT,
 -      "Minimum number of threads for replication.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "30s", 
PropertyType.TIMEDURATION,
 -      "The time between adjustments of the replication thread pool.", 
"1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", 
PropertyType.BYTES,
 -      "Maximum size of data to send in a replication message.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_WORK_ASSIGNER("replication.work.assigner",
 -      "org.apache.accumulo.manager.replication.UnorderedWorkAssigner", 
PropertyType.CLASSNAME,
 -      "Replication WorkAssigner implementation to use.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_DRIVER_DELAY("replication.driver.delay", "0s", 
PropertyType.TIMEDURATION,
 -      "Amount of time to wait before the replication work loop begins in the 
manager.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s",
 -      PropertyType.TIMEDURATION,
 -      "Amount of time to wait before first checking for replication work, not"
 -          + " useful outside of tests.",
 -      "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s",
 -      PropertyType.TIMEDURATION,
 -      "Amount of time to wait before re-checking for replication work, not"
 -          + " useful outside of tests.",
 -      "1.7.0"),
 -  @Deprecated(since = "2.1.0", forRemoval = true)
 -  REPLICATION_TRACE_PERCENT("replication.trace.percent", "0.1", 
PropertyType.FRACTION,
 -      "The sampling percentage to use for replication traces.", "1.7.0"),
 -  @Deprecated(since = "2.1.0")
 -  REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", 
PropertyType.TIMEDURATION,
 -      "Amount of time for a single replication RPC call to last before 
failing"
 -          + " the attempt. See replication.work.attempts.",
 -      "1.7.4"),
    // Compactor properties
 -  @Experimental
    COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
        "Properties in this category affect the behavior of the accumulo 
compactor server.", "2.1.0"),
    COMPACTOR_CANCEL_CHECK_INTERVAL("compactor.cancel.check.interval", "5m",
diff --cc core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index b874f3f69e,be90949aa0..58962a8048
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@@ -159,23 -161,9 +159,25 @@@ public enum PropertyType 
    FILENAME_EXT("file name extension", in(true, RFile.EXTENSION),
        "One of the currently supported filename extensions for storing table 
data files. "
            + "Currently, only " + RFile.EXTENSION + " is supported."),
 -
 +  VOLUMES("volumes", new ValidVolumes(), "See instance.volumes 
documentation"),
 +  FATE_USER_CONFIG(ValidUserFateConfig.NAME, new ValidUserFateConfig(),
 +      "An arbitrary string that: 1. Represents a valid, parsable generic json 
object. "
 +          + "2. the keys of the json are strings which contain a 
comma-separated list of fate operations. "
 +          + "3. the values of the json are integers which represent the 
number of threads assigned to the fate operations. "
 +          + "4. all possible user fate operations are present in the json. "
 +          + "5. no fate operations are repeated."),
 +
 +  FATE_META_CONFIG(ValidMetaFateConfig.NAME, new ValidMetaFateConfig(),
 +      "An arbitrary string that: 1. Represents a valid, parsable generic json 
object. "
 +          + "2. the keys of the json are strings which contain a 
comma-separated list of fate operations. "
 +          + "3. the values of the json are integers which represent the 
number of threads assigned to the fate operations. "
 +          + "4. all possible meta fate operations are present in the json. "
 +          + "5. no fate operations are repeated."),
 +  FATE_THREADPOOL_SIZE("(deprecated) Manager FATE thread pool size", new 
FateThreadPoolSize(),
 +      "No format check. Allows any value to be set but will warn the user 
that the"
-           + " property is no longer used.");
++          + " property is no longer used."),
+   EC("erasurecode", in(false, "enable", "disable", "inherit"),
+       "One of 'enable','disable','inherit'.");
  
    private final String shortname;
    private final String format;
diff --cc 
core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 46be67bf1e,c426169784..a6c2d4a8ad
--- 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@@ -42,7 -40,9 +42,8 @@@ import org.apache.hadoop.conf.Configura
  import org.apache.hadoop.fs.CreateFlag;
  import org.apache.hadoop.fs.FSDataOutputStream;
  import org.apache.hadoop.fs.FileSystem;
 -import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.hdfs.DistributedFileSystem;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -133,25 -134,62 +134,62 @@@ public class RFileOperations extends Fi
        }
        int bufferSize = conf.getInt("io.file.buffer.size", 4096);
  
 -      String file = options.getFilename();
 +      TabletFile file = options.getFile();
        FileSystem fs = options.getFileSystem();
  
-       if (options.dropCacheBehind) {
+       var ecEnable = EcEnabled.valueOf(
+           
options.getTableConfiguration().get(Property.TABLE_ENABLE_ERASURE_CODES).toUpperCase());
+ 
+       if (fs instanceof DistributedFileSystem) {
 -        var builder = ((DistributedFileSystem) fs).createFile(new 
Path(file)).bufferSize(bufferSize)
++        var builder = ((DistributedFileSystem) 
fs).createFile(file.getPath()).bufferSize(bufferSize)
+             .blockSize(block).overwrite(false);
+ 
+         if (options.dropCacheBehind) {
+           builder = builder.syncBlock();
+         }
+ 
+         switch (ecEnable) {
+           case ENABLE:
+             String ecPolicyName =
+                 
options.getTableConfiguration().get(Property.TABLE_ERASURE_CODE_POLICY);
+             // The default value of this property is empty string. If empty 
string is given to this
+             // builder it will disable erasure coding. So adding an explicit 
check for that.
+             Preconditions.checkArgument(!ecPolicyName.isBlank(), "Blank or 
empty value set for %s",
+                 Property.TABLE_ERASURE_CODE_POLICY.getKey());
+             builder = builder.ecPolicyName(ecPolicyName);
+             break;
+           case DISABLE:
+             // force replication
+             builder = builder.replication((short) rep).replicate();
+             break;
+           case INHERIT:
+             // use the directory settings for replication or EC
+             builder = builder.replication((short) rep);
+             break;
+           default:
+             throw new IllegalStateException(ecEnable.name());
+         }
+ 
+         outputStream = builder.build();
+       } else if (options.dropCacheBehind) {
          EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, 
CreateFlag.CREATE);
 -        outputStream = fs.create(new Path(file), FsPermission.getDefault(), 
set, bufferSize,
 +        outputStream = fs.create(file.getPath(), FsPermission.getDefault(), 
set, bufferSize,
              (short) rep, block, null);
+       } else {
 -        outputStream = fs.create(new Path(file), false, bufferSize, (short) 
rep, block);
++        outputStream = fs.create(file.getPath(), false, bufferSize, (short) 
rep, block);
+       }
+ 
+       if (options.dropCacheBehind) {
          try {
            // Tell the DataNode that the file does not need to be cached in 
the OS page cache
            outputStream.setDropBehind(Boolean.TRUE);
 -          LOG.trace("Called setDropBehind(TRUE) for stream writing file {}", 
options.filename);
 +          LOG.trace("Called setDropBehind(TRUE) for stream writing file {}", 
options.file);
          } catch (UnsupportedOperationException e) {
 -          LOG.debug("setDropBehind not enabled for file: {}", 
options.filename);
 +          LOG.debug("setDropBehind not enabled for file: {}", options.file);
          } catch (IOException e) {
 -          LOG.debug("IOException setting drop behind for file: {}, msg: {}", 
options.filename,
 +          LOG.debug("IOException setting drop behind for file: {}, msg: {}", 
options.file,
                e.getMessage());
          }
-       } else {
-         outputStream = fs.create(file.getPath(), false, bufferSize, (short) 
rep, block);
        }
      }
  
diff --cc 
minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index d97bcc10f8,5e04cb0a25..e44e28a069
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@@ -223,9 -198,8 +223,9 @@@ public class MiniAccumuloClusterImpl im
        conf.set("dfs.datanode.synconclose", "true");
        conf.set("dfs.datanode.data.dir.perm", 
MiniDFSUtil.computeDatanodeDirectoryPermission());
        config.getHadoopConfOverrides().forEach((k, v) -> conf.set(k, v));
 -      String oldTestBuildData = System.setProperty("test.build.data", 
dfs.getAbsolutePath());
 +      String oldTestBuildData =
 +          System.setProperty("test.build.data", 
dfs.toAbsolutePath().toString());
-       miniDFS.set(new MiniDFSCluster.Builder(conf).build());
+       miniDFS.set(new 
MiniDFSCluster.Builder(conf).numDataNodes(config.getNumDataNodes()).build());
        if (oldTestBuildData == null) {
          System.clearProperty("test.build.data");
        } else {
diff --cc 
minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 59addba1c9,23337a019c..0cbc0e889f
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@@ -585,11 -622,31 +588,21 @@@ public class MiniAccumuloConfigImpl 
     * underlying miniDFS cannot be restarted.
     */
    public void useMiniDFS(boolean useMiniDFS) {
+     useMiniDFS(useMiniDFS, 1);
+   }
+ 
+   public void useMiniDFS(boolean useMiniDFS, int numDataNodes) {
+     Preconditions.checkArgument(numDataNodes > 0);
      this.useMiniDFS = useMiniDFS;
+     this.numMiniDFSDataNodes = numDataNodes;
+   }
+ 
+   public int getNumDataNodes() {
+     return numMiniDFSDataNodes;
    }
  
 -  /**
 -   * @return location of client conf file containing connection parameters 
for connecting to this
 -   *         minicluster
 -   *
 -   * @since 1.6.0
 -   */
 -  public File getClientConfFile() {
 -    return new File(getConfDir(), "client.conf");
 -  }
 -
    public File getAccumuloPropsFile() {
 -    return new File(getConfDir(), "accumulo.properties");
 +    return getConfDir().toPath().resolve("accumulo.properties").toFile();
    }
  
    /**
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java
index 0000000000,0a4672060b..01eaff31ba
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java
@@@ -1,0 -1,282 +1,282 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.test.compaction;
+ 
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertThrows;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ 
+ import java.security.SecureRandom;
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.stream.Collectors;
+ 
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.AccumuloException;
+ import org.apache.accumulo.core.client.admin.CompactionConfig;
+ import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+ import org.apache.accumulo.core.client.admin.PluginConfig;
+ import org.apache.accumulo.core.client.admin.compaction.ErasureCodeConfigurer;
+ import org.apache.accumulo.core.client.rfile.RFile;
+ import org.apache.accumulo.core.clientImpl.ClientContext;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+ import org.apache.accumulo.test.functional.ConfigurableMacBase;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hdfs.DistributedFileSystem;
+ import org.apache.hadoop.ipc.RemoteException;
+ import org.junit.jupiter.api.Test;
+ 
+ public class ErasureCodeIT extends ConfigurableMacBase {
+   @Override
+   protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 -    cfg.setNumTservers(1);
++    cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
+     cfg.useMiniDFS(true, 5);
+   }
+ 
+   List<String> getECPolicies(DistributedFileSystem dfs, ClientContext ctx, 
String table)
+       throws Exception {
+     var ample = ctx.getAmple();
+     var tableId = ctx.getTableId(table);
+ 
+     var policies = new ArrayList<String>();
+ 
+     try (var tablets =
+         
ample.readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.FILES).build())
 {
+       for (var tabletMeta : tablets) {
+         for (var file : tabletMeta.getFiles()) {
+           var policy = dfs.getErasureCodingPolicy(file.getPath());
+           if (policy != null) {
+             policies.add(policy.getName());
+           } else {
+             policies.add("none");
+           }
+         }
+       }
+     }
+ 
+     return policies;
+   }
+ 
+   private Path getTableDir(ClientContext ctx, String table) throws Exception {
+     var ample = ctx.getAmple();
+     var tableId = ctx.getTableId(table);
+ 
+     try (var tablets =
+         
ample.readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.FILES).build())
 {
+       for (var tabletMeta : tablets) {
+         for (var file : tabletMeta.getFiles()) {
+           // take the tablets first file and use that to get the dir
+           var path = file.getPath().getParent().getParent();
+           // check the assumption of the above code
+           assertTrue(path.toString().endsWith(tableId.canonical()));
+           return path;
+ 
+         }
+       }
+     }
+ 
+     throw new IllegalStateException("table " + table + " has no files");
+   }
+ 
+   @Test
+   public void test() throws Exception {
+     var names = getUniqueNames(3);
+     var table1 = names[0];
+     var table2 = names[1];
+     var table3 = names[2];
+     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
+ 
+       var policy1 = "XOR-2-1-1024k";
+       var policy2 = "RS-3-2-1024k";
+       var dfs = getCluster().getMiniDfs().getFileSystem();
+       var configuredPolicies = dfs.getAllErasureCodingPolicies().stream()
+           .map(ecpi -> 
ecpi.getPolicy().getName()).collect(Collectors.toSet());
+       assertTrue(configuredPolicies.contains(policy1));
+       assertTrue(configuredPolicies.contains(policy2));
+       dfs.enableErasureCodingPolicy(policy1);
+       dfs.enableErasureCodingPolicy(policy2);
+ 
+       var options = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(), 
policy1,
+           Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable");
+       c.tableOperations().create(table1, new 
NewTableConfiguration().setProperties(options));
+ 
+       var options2 = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(), 
policy1,
+           Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "inherit");
+       c.tableOperations().create(table2, new 
NewTableConfiguration().setProperties(options2));
+ 
+       var options3 = Map.of(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), 
"disable");
+       c.tableOperations().create(table3, new 
NewTableConfiguration().setProperties(options3));
+ 
+       SecureRandom random = new SecureRandom();
+ 
+       try (var writer = c.createMultiTableBatchWriter()) {
+         byte[] bytes = new byte[50_000];
+         Mutation m = new Mutation("xyx");
+         random.nextBytes(bytes);
+         m.at().family("r").qualifier("d").put(bytes);
+         writer.getBatchWriter(table1).addMutation(m);
+         writer.getBatchWriter(table2).addMutation(m);
+         writer.getBatchWriter(table3).addMutation(m);
+ 
+         m = new Mutation("xyz");
+         random.nextBytes(bytes);
+         m.at().family("r").qualifier("d").put(bytes);
+         writer.getBatchWriter(table1).addMutation(m);
+         writer.getBatchWriter(table2).addMutation(m);
+         writer.getBatchWriter(table3).addMutation(m);
+       }
+       c.tableOperations().flush(table1, null, null, true);
+       c.tableOperations().flush(table2, null, null, true);
+       c.tableOperations().flush(table3, null, null, true);
+ 
+       var ctx = ((ClientContext) c);
+ 
+       assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1));
+       assertEquals(List.of("none"), getECPolicies(dfs, ctx, table2));
+       assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3));
+ 
+       // This should cause the table to compact w/o erasure coding even 
though its configured on the
+       // table
+       var cconfig = new CompactionConfig()
+           .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+               Map.of(ErasureCodeConfigurer.BYPASS_ERASURE_CODES, "true")))
+           .setWait(true);
+       c.tableOperations().compact(table1, cconfig);
+       assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1));
+ 
+       // table2 does not have erasure coding configured, this should cause it 
to compact w/ erasure
+       // coding because its file should be >10K.
+       cconfig = new CompactionConfig()
+           .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+               Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "10K")))
+           .setWait(true);
+       c.tableOperations().compact(table2, cconfig);
+       assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table2));
+ 
+       // table2 has a file around 100K in size, it should not use erasure 
coding because its less
+       // than 1M
+       cconfig = new CompactionConfig()
+           .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+               Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "1M")))
+           .setWait(true);
+       c.tableOperations().compact(table2, cconfig);
+       assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1));
+ 
+       // set a different policy for this compaction than what is configured 
on the table
+       cconfig = new CompactionConfig()
+           .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+               Map.of(ErasureCodeConfigurer.ERASURE_CODE_POLICY, policy2,
+                   ErasureCodeConfigurer.ERASURE_CODE_SIZE, "10K")))
+           .setWait(true);
+       c.tableOperations().compact(table1, cconfig);
+       assertEquals(List.of(policy2), getECPolicies(dfs, ctx, table1));
+ 
+       // table1 has erasure coding enabled for the table, this should 
override that and disable
+       // erasure coding for the compaction
+       cconfig = new CompactionConfig()
+           .setConfigurer(new 
PluginConfig(ErasureCodeConfigurer.class.getName(),
+               Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "1M")))
+           .setWait(true);
+       c.tableOperations().compact(table1, cconfig);
+       assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1));
+ 
+       // add new files to the tables
+       try (var writer = c.createMultiTableBatchWriter()) {
+         byte[] bytes = new byte[10_000];
+         random.nextBytes(bytes);
+         Mutation m = new Mutation("xyx");
+         m.at().family("r2").qualifier("d").put(bytes);
+         writer.getBatchWriter(table1).addMutation(m);
+         writer.getBatchWriter(table2).addMutation(m);
+       }
+       c.tableOperations().flush(table1, null, null, true);
+       c.tableOperations().flush(table2, null, null, true);
+ 
+       assertEquals(List.of("none", policy1), getECPolicies(dfs, ctx, table1));
+       assertEquals(List.of("none", "none"), getECPolicies(dfs, ctx, table2));
+ 
+       // set the table dir erasure coding policy for all tables
+       dfs.setErasureCodingPolicy(getTableDir(ctx, table1), policy2);
+       dfs.setErasureCodingPolicy(getTableDir(ctx, table2), policy2);
+       dfs.setErasureCodingPolicy(getTableDir(ctx, table3), policy2);
+       // compact all the tables and see how setting an EC policy on the table 
dir influenced the
+       // files created
+       c.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+       c.tableOperations().compact(table2, new 
CompactionConfig().setWait(true));
+       c.tableOperations().compact(table3, new 
CompactionConfig().setWait(true));
+       // the table settings specify policy1 so that should win
+       assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1));
+       // the table settings specify to use the dfs dir settings so that 
should win
+       assertEquals(List.of(policy2), getECPolicies(dfs, ctx, table2));
+       // the table setting specify to use replication so that should win
+       assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3));
+ 
+       // unset the EC policy on all table dirs
+       dfs.unsetErasureCodingPolicy(getTableDir(ctx, table1));
+       dfs.unsetErasureCodingPolicy(getTableDir(ctx, table2));
+       dfs.unsetErasureCodingPolicy(getTableDir(ctx, table3));
+       // compact all the tables and see what happens
+       c.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+       c.tableOperations().compact(table2, new 
CompactionConfig().setWait(true));
+       c.tableOperations().compact(table3, new 
CompactionConfig().setWait(true));
+       // the table settings specify policy1 so that should win
+       assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1));
+       // the table settings specify to use the dfs dir settings so that 
should win and iit should
+       // replicate
+       assertEquals(List.of("none"), getECPolicies(dfs, ctx, table2));
+       // the table setting specify to use replication and so do the directory 
settings
+       assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3));
+ 
+       // test configuring an invalid policy and ensure file creation fails
+       dfs.mkdirs(new Path("/tmp"));
+       var badOptions = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(), 
"ycilop",
+           Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable");
+       var exp = assertThrows(RemoteException.class, () -> 
RFile.newWriter().to("/tmp/test1.rf")
+           .withFileSystem(dfs).withTableProperties(badOptions).build());
+       assertTrue(exp.getMessage().contains("ycilop"));
+ 
+       // Enable erasure coding but do not set a policy. The default value for 
the policy is empty
+       // string, so this should cause a failure.
+       var badOptions2 = Map.of(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), 
"enable");
+       var exp3 = assertThrows(IllegalArgumentException.class, () -> 
RFile.newWriter()
+           
.to("/tmp/test2.rf").withFileSystem(dfs).withTableProperties(badOptions2).build());
+       assertTrue(exp3.getMessage().contains("empty")
+           && 
exp3.getMessage().contains(Property.TABLE_ERASURE_CODE_POLICY.getKey()));
+       // Verify assumption about default value
+       assertEquals("", Property.TABLE_ERASURE_CODE_POLICY.getDefaultValue());
+ 
+       // try setting invalid EC policy on table, should fail
+       var exp2 = assertThrows(AccumuloException.class, () -> 
c.tableOperations().setProperty(table1,
+           Property.TABLE_ERASURE_CODE_POLICY.getKey(), "ycilop"));
+       assertTrue(exp2.getMessage().contains("ycilop"));
+       assertEquals(policy1, c.tableOperations().getConfiguration(table1)
+           .get(Property.TABLE_ERASURE_CODE_POLICY.getKey()));
+       // should be able to set a valid policy
+       c.tableOperations().setProperty(table1, 
Property.TABLE_ERASURE_CODE_POLICY.getKey(), policy2);
+       assertEquals(policy2, c.tableOperations().getConfiguration(table1)
+           .get(Property.TABLE_ERASURE_CODE_POLICY.getKey()));
+     }
+   }
+ }

Reply via email to