This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit c7205cd1b972fa51aadce953e8bcc3d0883eca9c Merge: e7b3c6680b 2e31448aab Author: Keith Turner <[email protected]> AuthorDate: Fri Aug 1 23:04:46 2025 +0000 Merge commit '2e31448aabd7012aef86fa21f88d8c94f185ebfc' .../admin/compaction/CompressionConfigurer.java | 11 +- .../admin/compaction/ErasureCodeConfigurer.java | 98 +++++++ .../org/apache/accumulo/core/conf/Property.java | 14 + .../apache/accumulo/core/conf/PropertyType.java | 4 +- .../apache/accumulo/core/file/rfile/EcEnabled.java | 23 ++ .../accumulo/core/file/rfile/RFileOperations.java | 44 +++- .../accumulo/core/conf/PropertyTypeTest.java | 5 + .../miniclusterImpl/MiniAccumuloClusterImpl.java | 2 +- .../miniclusterImpl/MiniAccumuloConfigImpl.java | 13 + .../org/apache/accumulo/server/util/PropUtil.java | 26 +- .../accumulo/test/compaction/ErasureCodeIT.java | 282 +++++++++++++++++++++ 11 files changed, 511 insertions(+), 11 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 35a98bafa2,61addfa80a..b63b6a0d9f --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -1226,7 -1416,139 +1226,21 @@@ public enum Property + "also consider configuring the `" + NoDeleteConstraint.class.getName() + "` " + "constraint.", "2.0.0"), - + TABLE_ENABLE_ERASURE_CODES("table.file.ec", "inherit", PropertyType.EC, + "This determines if Accumulo will manage erasure codes on a table." + + " When setting this to 'enable' must also set erasure.code.policy and that policy will " + + "always be used regardless of DFS directory settings. When set to 'disable', replication " + + "will always be used regardless of DFS directory settings. When set to 'inherit' " + + "the settings from the directory in dfs will be used. Enabling erasure coding on a volume " + + "that does not support it is a noop.", + "2.1.4"), + + TABLE_ERASURE_CODE_POLICY("table.file.ec.policy", "", PropertyType.STRING, + "The name of the erasure code policy to be used. Policy must be available and enabled in hdfs. " + + "To view if policy is enabled check hdfs ec -listPolicies. This setting is only used when " + + "table.file.ec is set to enable.", + "2.1.4"), - // VFS ClassLoader properties - - // this property shouldn't be used directly; it exists solely to document the default value - // defined by its use in AccumuloVFSClassLoader when generating the property documentation - @Deprecated(since = "2.1.0", forRemoval = true) - VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY( - org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY, - "", PropertyType.STRING, - "Configuration for a system level vfs classloader. Accumulo jar can be" - + " configured here and loaded out of HDFS.", - "1.5.0"), - @Deprecated(since = "2.1.0", forRemoval = true) - VFS_CONTEXT_CLASSPATH_PROPERTY( - org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.VFS_CONTEXT_CLASSPATH_PROPERTY, - null, PropertyType.PREFIX, - "Properties in this category are define a classpath. These properties" - + " start with the category prefix, followed by a context name. The value is" - + " a comma separated list of URIs. Supports full regex on filename alone." - + " For example, general.vfs.context.classpath.cx1=hdfs://nn1:9902/mylibdir/*.jar." - + " You can enable post delegation for a context, which will load classes from the" - + " context first instead of the parent first. Do this by setting" - + " `general.vfs.context.classpath.<name>.delegation=post`, where `<name>` is" - + " your context name. If delegation is not specified, it defaults to loading" - + " from parent classloader first.", - "1.5.0"), - - // this property shouldn't be used directly; it exists solely to document the default value - // defined by its use in AccumuloVFSClassLoader when generating the property documentation - @Deprecated(since = "2.1.0", forRemoval = true) - VFS_CLASSLOADER_CACHE_DIR( - org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.VFS_CACHE_DIR, - "${java.io.tmpdir}", PropertyType.ABSOLUTEPATH, - "The base directory to use for the vfs cache. The actual cached files will be located" - + " in a subdirectory, `accumulo-vfs-cache-<jvmProcessName>-${user.name}`, where" - + " `<jvmProcessName>` is determined by the JVM's internal management engine." - + " The cache will keep a soft reference to all of the classes loaded in the VM." - + " This should be on local disk on each node with sufficient space.", - "1.5.0"), - - // General properties for configuring replication - @Deprecated(since = "2.1.0") - REPLICATION_PREFIX("replication.", null, PropertyType.PREFIX, - "Properties in this category affect the replication of data to other Accumulo instances.", - "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_PEERS("replication.peer.", null, PropertyType.PREFIX, - "Properties in this category control what systems data can be replicated to.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_PEER_USER("replication.peer.user.", null, PropertyType.PREFIX, - "The username to provide when authenticating with the given peer.", "1.7.0"), - @Sensitive - @Deprecated(since = "2.1.0") - REPLICATION_PEER_PASSWORD("replication.peer.password.", null, PropertyType.PREFIX, - "The password to provide when authenticating with the given peer.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_PEER_KEYTAB("replication.peer.keytab.", null, PropertyType.PREFIX, - "The keytab to use when authenticating with the given peer.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_NAME("replication.name", "", PropertyType.STRING, - "Name of this cluster with respect to replication. Used to identify this" - + " instance from other peers.", - "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "1000", PropertyType.COUNT, - "Upper bound of the number of files queued for replication.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", - PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment.", - "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, - "Size of the threadpool that each tabletserver devotes to replicating data.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, - "Listen port used by thrift service in tserver listening for replication.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", PropertyType.COUNT, - "Number of attempts to try to replicate some data before giving up and" - + " letting it naturally be retried later.", - "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, - "Minimum number of threads for replication.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "30s", PropertyType.TIMEDURATION, - "The time between adjustments of the replication thread pool.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.BYTES, - "Maximum size of data to send in a replication message.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_WORK_ASSIGNER("replication.work.assigner", - "org.apache.accumulo.manager.replication.UnorderedWorkAssigner", PropertyType.CLASSNAME, - "Replication WorkAssigner implementation to use.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_DRIVER_DELAY("replication.driver.delay", "0s", PropertyType.TIMEDURATION, - "Amount of time to wait before the replication work loop begins in the manager.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", - PropertyType.TIMEDURATION, - "Amount of time to wait before first checking for replication work, not" - + " useful outside of tests.", - "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", - PropertyType.TIMEDURATION, - "Amount of time to wait before re-checking for replication work, not" - + " useful outside of tests.", - "1.7.0"), - @Deprecated(since = "2.1.0", forRemoval = true) - REPLICATION_TRACE_PERCENT("replication.trace.percent", "0.1", PropertyType.FRACTION, - "The sampling percentage to use for replication traces.", "1.7.0"), - @Deprecated(since = "2.1.0") - REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", PropertyType.TIMEDURATION, - "Amount of time for a single replication RPC call to last before failing" - + " the attempt. See replication.work.attempts.", - "1.7.4"), // Compactor properties - @Experimental COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the accumulo compactor server.", "2.1.0"), COMPACTOR_CANCEL_CHECK_INTERVAL("compactor.cancel.check.interval", "5m", diff --cc core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java index b874f3f69e,be90949aa0..58962a8048 --- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java @@@ -159,23 -161,9 +159,25 @@@ public enum PropertyType FILENAME_EXT("file name extension", in(true, RFile.EXTENSION), "One of the currently supported filename extensions for storing table data files. " + "Currently, only " + RFile.EXTENSION + " is supported."), - + VOLUMES("volumes", new ValidVolumes(), "See instance.volumes documentation"), + FATE_USER_CONFIG(ValidUserFateConfig.NAME, new ValidUserFateConfig(), + "An arbitrary string that: 1. Represents a valid, parsable generic json object. " + + "2. the keys of the json are strings which contain a comma-separated list of fate operations. " + + "3. the values of the json are integers which represent the number of threads assigned to the fate operations. " + + "4. all possible user fate operations are present in the json. " + + "5. no fate operations are repeated."), + + FATE_META_CONFIG(ValidMetaFateConfig.NAME, new ValidMetaFateConfig(), + "An arbitrary string that: 1. Represents a valid, parsable generic json object. " + + "2. the keys of the json are strings which contain a comma-separated list of fate operations. " + + "3. the values of the json are integers which represent the number of threads assigned to the fate operations. " + + "4. all possible meta fate operations are present in the json. " + + "5. no fate operations are repeated."), + FATE_THREADPOOL_SIZE("(deprecated) Manager FATE thread pool size", new FateThreadPoolSize(), + "No format check. Allows any value to be set but will warn the user that the" - + " property is no longer used."); ++ + " property is no longer used."), + EC("erasurecode", in(false, "enable", "disable", "inherit"), + "One of 'enable','disable','inherit'."); private final String shortname; private final String format; diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index 46be67bf1e,c426169784..a6c2d4a8ad --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@@ -42,7 -40,9 +42,8 @@@ import org.apache.hadoop.conf.Configura import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; + import org.apache.hadoop.hdfs.DistributedFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -133,25 -134,62 +134,62 @@@ public class RFileOperations extends Fi } int bufferSize = conf.getInt("io.file.buffer.size", 4096); - String file = options.getFilename(); + TabletFile file = options.getFile(); FileSystem fs = options.getFileSystem(); - if (options.dropCacheBehind) { + var ecEnable = EcEnabled.valueOf( + options.getTableConfiguration().get(Property.TABLE_ENABLE_ERASURE_CODES).toUpperCase()); + + if (fs instanceof DistributedFileSystem) { - var builder = ((DistributedFileSystem) fs).createFile(new Path(file)).bufferSize(bufferSize) ++ var builder = ((DistributedFileSystem) fs).createFile(file.getPath()).bufferSize(bufferSize) + .blockSize(block).overwrite(false); + + if (options.dropCacheBehind) { + builder = builder.syncBlock(); + } + + switch (ecEnable) { + case ENABLE: + String ecPolicyName = + options.getTableConfiguration().get(Property.TABLE_ERASURE_CODE_POLICY); + // The default value of this property is empty string. If empty string is given to this + // builder it will disable erasure coding. So adding an explicit check for that. + Preconditions.checkArgument(!ecPolicyName.isBlank(), "Blank or empty value set for %s", + Property.TABLE_ERASURE_CODE_POLICY.getKey()); + builder = builder.ecPolicyName(ecPolicyName); + break; + case DISABLE: + // force replication + builder = builder.replication((short) rep).replicate(); + break; + case INHERIT: + // use the directory settings for replication or EC + builder = builder.replication((short) rep); + break; + default: + throw new IllegalStateException(ecEnable.name()); + } + + outputStream = builder.build(); + } else if (options.dropCacheBehind) { EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE); - outputStream = fs.create(new Path(file), FsPermission.getDefault(), set, bufferSize, + outputStream = fs.create(file.getPath(), FsPermission.getDefault(), set, bufferSize, (short) rep, block, null); + } else { - outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block); ++ outputStream = fs.create(file.getPath(), false, bufferSize, (short) rep, block); + } + + if (options.dropCacheBehind) { try { // Tell the DataNode that the file does not need to be cached in the OS page cache outputStream.setDropBehind(Boolean.TRUE); - LOG.trace("Called setDropBehind(TRUE) for stream writing file {}", options.filename); + LOG.trace("Called setDropBehind(TRUE) for stream writing file {}", options.file); } catch (UnsupportedOperationException e) { - LOG.debug("setDropBehind not enabled for file: {}", options.filename); + LOG.debug("setDropBehind not enabled for file: {}", options.file); } catch (IOException e) { - LOG.debug("IOException setting drop behind for file: {}, msg: {}", options.filename, + LOG.debug("IOException setting drop behind for file: {}, msg: {}", options.file, e.getMessage()); } - } else { - outputStream = fs.create(file.getPath(), false, bufferSize, (short) rep, block); } } diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index d97bcc10f8,5e04cb0a25..e44e28a069 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@@ -223,9 -198,8 +223,9 @@@ public class MiniAccumuloClusterImpl im conf.set("dfs.datanode.synconclose", "true"); conf.set("dfs.datanode.data.dir.perm", MiniDFSUtil.computeDatanodeDirectoryPermission()); config.getHadoopConfOverrides().forEach((k, v) -> conf.set(k, v)); - String oldTestBuildData = System.setProperty("test.build.data", dfs.getAbsolutePath()); + String oldTestBuildData = + System.setProperty("test.build.data", dfs.toAbsolutePath().toString()); - miniDFS.set(new MiniDFSCluster.Builder(conf).build()); + miniDFS.set(new MiniDFSCluster.Builder(conf).numDataNodes(config.getNumDataNodes()).build()); if (oldTestBuildData == null) { System.clearProperty("test.build.data"); } else { diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java index 59addba1c9,23337a019c..0cbc0e889f --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java @@@ -585,11 -622,31 +588,21 @@@ public class MiniAccumuloConfigImpl * underlying miniDFS cannot be restarted. */ public void useMiniDFS(boolean useMiniDFS) { + useMiniDFS(useMiniDFS, 1); + } + + public void useMiniDFS(boolean useMiniDFS, int numDataNodes) { + Preconditions.checkArgument(numDataNodes > 0); this.useMiniDFS = useMiniDFS; + this.numMiniDFSDataNodes = numDataNodes; + } + + public int getNumDataNodes() { + return numMiniDFSDataNodes; } - /** - * @return location of client conf file containing connection parameters for connecting to this - * minicluster - * - * @since 1.6.0 - */ - public File getClientConfFile() { - return new File(getConfDir(), "client.conf"); - } - public File getAccumuloPropsFile() { - return new File(getConfDir(), "accumulo.properties"); + return getConfDir().toPath().resolve("accumulo.properties").toFile(); } /** diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java index 0000000000,0a4672060b..01eaff31ba mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ErasureCodeIT.java @@@ -1,0 -1,282 +1,282 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test.compaction; + + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertThrows; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.security.SecureRandom; + import java.util.ArrayList; + import java.util.List; + import java.util.Map; + import java.util.stream.Collectors; + + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.AccumuloException; + import org.apache.accumulo.core.client.admin.CompactionConfig; + import org.apache.accumulo.core.client.admin.NewTableConfiguration; + import org.apache.accumulo.core.client.admin.PluginConfig; + import org.apache.accumulo.core.client.admin.compaction.ErasureCodeConfigurer; + import org.apache.accumulo.core.client.rfile.RFile; + import org.apache.accumulo.core.clientImpl.ClientContext; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.metadata.schema.TabletMetadata; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.test.functional.ConfigurableMacBase; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hdfs.DistributedFileSystem; + import org.apache.hadoop.ipc.RemoteException; + import org.junit.jupiter.api.Test; + + public class ErasureCodeIT extends ConfigurableMacBase { + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setNumTservers(1); ++ cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); + cfg.useMiniDFS(true, 5); + } + + List<String> getECPolicies(DistributedFileSystem dfs, ClientContext ctx, String table) + throws Exception { + var ample = ctx.getAmple(); + var tableId = ctx.getTableId(table); + + var policies = new ArrayList<String>(); + + try (var tablets = + ample.readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.FILES).build()) { + for (var tabletMeta : tablets) { + for (var file : tabletMeta.getFiles()) { + var policy = dfs.getErasureCodingPolicy(file.getPath()); + if (policy != null) { + policies.add(policy.getName()); + } else { + policies.add("none"); + } + } + } + } + + return policies; + } + + private Path getTableDir(ClientContext ctx, String table) throws Exception { + var ample = ctx.getAmple(); + var tableId = ctx.getTableId(table); + + try (var tablets = + ample.readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.FILES).build()) { + for (var tabletMeta : tablets) { + for (var file : tabletMeta.getFiles()) { + // take the tablets first file and use that to get the dir + var path = file.getPath().getParent().getParent(); + // check the assumption of the above code + assertTrue(path.toString().endsWith(tableId.canonical())); + return path; + + } + } + } + + throw new IllegalStateException("table " + table + " has no files"); + } + + @Test + public void test() throws Exception { + var names = getUniqueNames(3); + var table1 = names[0]; + var table2 = names[1]; + var table3 = names[2]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + + var policy1 = "XOR-2-1-1024k"; + var policy2 = "RS-3-2-1024k"; + var dfs = getCluster().getMiniDfs().getFileSystem(); + var configuredPolicies = dfs.getAllErasureCodingPolicies().stream() + .map(ecpi -> ecpi.getPolicy().getName()).collect(Collectors.toSet()); + assertTrue(configuredPolicies.contains(policy1)); + assertTrue(configuredPolicies.contains(policy2)); + dfs.enableErasureCodingPolicy(policy1); + dfs.enableErasureCodingPolicy(policy2); + + var options = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(), policy1, + Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable"); + c.tableOperations().create(table1, new NewTableConfiguration().setProperties(options)); + + var options2 = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(), policy1, + Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "inherit"); + c.tableOperations().create(table2, new NewTableConfiguration().setProperties(options2)); + + var options3 = Map.of(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "disable"); + c.tableOperations().create(table3, new NewTableConfiguration().setProperties(options3)); + + SecureRandom random = new SecureRandom(); + + try (var writer = c.createMultiTableBatchWriter()) { + byte[] bytes = new byte[50_000]; + Mutation m = new Mutation("xyx"); + random.nextBytes(bytes); + m.at().family("r").qualifier("d").put(bytes); + writer.getBatchWriter(table1).addMutation(m); + writer.getBatchWriter(table2).addMutation(m); + writer.getBatchWriter(table3).addMutation(m); + + m = new Mutation("xyz"); + random.nextBytes(bytes); + m.at().family("r").qualifier("d").put(bytes); + writer.getBatchWriter(table1).addMutation(m); + writer.getBatchWriter(table2).addMutation(m); + writer.getBatchWriter(table3).addMutation(m); + } + c.tableOperations().flush(table1, null, null, true); + c.tableOperations().flush(table2, null, null, true); + c.tableOperations().flush(table3, null, null, true); + + var ctx = ((ClientContext) c); + + assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1)); + assertEquals(List.of("none"), getECPolicies(dfs, ctx, table2)); + assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3)); + + // This should cause the table to compact w/o erasure coding even though its configured on the + // table + var cconfig = new CompactionConfig() + .setConfigurer(new PluginConfig(ErasureCodeConfigurer.class.getName(), + Map.of(ErasureCodeConfigurer.BYPASS_ERASURE_CODES, "true"))) + .setWait(true); + c.tableOperations().compact(table1, cconfig); + assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1)); + + // table2 does not have erasure coding configured, this should cause it to compact w/ erasure + // coding because its file should be >10K. + cconfig = new CompactionConfig() + .setConfigurer(new PluginConfig(ErasureCodeConfigurer.class.getName(), + Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "10K"))) + .setWait(true); + c.tableOperations().compact(table2, cconfig); + assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table2)); + + // table2 has a file around 100K in size, it should not use erasure coding because its less + // than 1M + cconfig = new CompactionConfig() + .setConfigurer(new PluginConfig(ErasureCodeConfigurer.class.getName(), + Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "1M"))) + .setWait(true); + c.tableOperations().compact(table2, cconfig); + assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1)); + + // set a different policy for this compaction than what is configured on the table + cconfig = new CompactionConfig() + .setConfigurer(new PluginConfig(ErasureCodeConfigurer.class.getName(), + Map.of(ErasureCodeConfigurer.ERASURE_CODE_POLICY, policy2, + ErasureCodeConfigurer.ERASURE_CODE_SIZE, "10K"))) + .setWait(true); + c.tableOperations().compact(table1, cconfig); + assertEquals(List.of(policy2), getECPolicies(dfs, ctx, table1)); + + // table1 has erasure coding enabled for the table, this should override that and disable + // erasure coding for the compaction + cconfig = new CompactionConfig() + .setConfigurer(new PluginConfig(ErasureCodeConfigurer.class.getName(), + Map.of(ErasureCodeConfigurer.ERASURE_CODE_SIZE, "1M"))) + .setWait(true); + c.tableOperations().compact(table1, cconfig); + assertEquals(List.of("none"), getECPolicies(dfs, ctx, table1)); + + // add new files to the tables + try (var writer = c.createMultiTableBatchWriter()) { + byte[] bytes = new byte[10_000]; + random.nextBytes(bytes); + Mutation m = new Mutation("xyx"); + m.at().family("r2").qualifier("d").put(bytes); + writer.getBatchWriter(table1).addMutation(m); + writer.getBatchWriter(table2).addMutation(m); + } + c.tableOperations().flush(table1, null, null, true); + c.tableOperations().flush(table2, null, null, true); + + assertEquals(List.of("none", policy1), getECPolicies(dfs, ctx, table1)); + assertEquals(List.of("none", "none"), getECPolicies(dfs, ctx, table2)); + + // set the table dir erasure coding policy for all tables + dfs.setErasureCodingPolicy(getTableDir(ctx, table1), policy2); + dfs.setErasureCodingPolicy(getTableDir(ctx, table2), policy2); + dfs.setErasureCodingPolicy(getTableDir(ctx, table3), policy2); + // compact all the tables and see how setting an EC policy on the table dir influenced the + // files created + c.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + c.tableOperations().compact(table2, new CompactionConfig().setWait(true)); + c.tableOperations().compact(table3, new CompactionConfig().setWait(true)); + // the table settings specify policy1 so that should win + assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1)); + // the table settings specify to use the dfs dir settings so that should win + assertEquals(List.of(policy2), getECPolicies(dfs, ctx, table2)); + // the table setting specify to use replication so that should win + assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3)); + + // unset the EC policy on all table dirs + dfs.unsetErasureCodingPolicy(getTableDir(ctx, table1)); + dfs.unsetErasureCodingPolicy(getTableDir(ctx, table2)); + dfs.unsetErasureCodingPolicy(getTableDir(ctx, table3)); + // compact all the tables and see what happens + c.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + c.tableOperations().compact(table2, new CompactionConfig().setWait(true)); + c.tableOperations().compact(table3, new CompactionConfig().setWait(true)); + // the table settings specify policy1 so that should win + assertEquals(List.of(policy1), getECPolicies(dfs, ctx, table1)); + // the table settings specify to use the dfs dir settings so that should win and iit should + // replicate + assertEquals(List.of("none"), getECPolicies(dfs, ctx, table2)); + // the table setting specify to use replication and so do the directory settings + assertEquals(List.of("none"), getECPolicies(dfs, ctx, table3)); + + // test configuring an invalid policy and ensure file creation fails + dfs.mkdirs(new Path("/tmp")); + var badOptions = Map.of(Property.TABLE_ERASURE_CODE_POLICY.getKey(), "ycilop", + Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable"); + var exp = assertThrows(RemoteException.class, () -> RFile.newWriter().to("/tmp/test1.rf") + .withFileSystem(dfs).withTableProperties(badOptions).build()); + assertTrue(exp.getMessage().contains("ycilop")); + + // Enable erasure coding but do not set a policy. The default value for the policy is empty + // string, so this should cause a failure. + var badOptions2 = Map.of(Property.TABLE_ENABLE_ERASURE_CODES.getKey(), "enable"); + var exp3 = assertThrows(IllegalArgumentException.class, () -> RFile.newWriter() + .to("/tmp/test2.rf").withFileSystem(dfs).withTableProperties(badOptions2).build()); + assertTrue(exp3.getMessage().contains("empty") + && exp3.getMessage().contains(Property.TABLE_ERASURE_CODE_POLICY.getKey())); + // Verify assumption about default value + assertEquals("", Property.TABLE_ERASURE_CODE_POLICY.getDefaultValue()); + + // try setting invalid EC policy on table, should fail + var exp2 = assertThrows(AccumuloException.class, () -> c.tableOperations().setProperty(table1, + Property.TABLE_ERASURE_CODE_POLICY.getKey(), "ycilop")); + assertTrue(exp2.getMessage().contains("ycilop")); + assertEquals(policy1, c.tableOperations().getConfiguration(table1) + .get(Property.TABLE_ERASURE_CODE_POLICY.getKey())); + // should be able to set a valid policy + c.tableOperations().setProperty(table1, Property.TABLE_ERASURE_CODE_POLICY.getKey(), policy2); + assertEquals(policy2, c.tableOperations().getConfiguration(table1) + .get(Property.TABLE_ERASURE_CODE_POLICY.getKey())); + } + } + }
