Repository: accumulo Updated Branches: refs/heads/master 8031dcda6 -> ef909d5fb
ACCUMULO-3178 Create example preferred volumes chooser Signed-off-by: Christopher Tubbs <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ef909d5f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ef909d5f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ef909d5f Branch: refs/heads/master Commit: ef909d5fb6ce0eb83d35ae1e702885fb9b98cf94 Parents: 3f44f8c Author: Jenna Huston <[email protected]> Authored: Tue Oct 28 09:29:52 2014 -0400 Committer: Christopher Tubbs <[email protected]> Committed: Fri Dec 5 20:04:22 2014 -0500 ---------------------------------------------------------------------- .../server/fs/PreferredVolumeChooser.java | 80 ++++ .../accumulo/server/fs/RandomVolumeChooser.java | 2 +- .../apache/accumulo/test/VolumeChooserIT.java | 395 +++++++++++++++++++ 3 files changed, 476 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef909d5f/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java new file mode 100644 index 0000000..7ed7bba --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.fs; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.conf.AccumuloConfiguration.PropertyFilter; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.log4j.Logger; + +public class PreferredVolumeChooser extends RandomVolumeChooser implements VolumeChooser { + private static final Logger log = Logger.getLogger(PreferredVolumeChooser.class); + + public static final String PREFERRED_VOLUMES_CUSTOM_KEY = Property.TABLE_ARBITRARY_PROP_PREFIX.getKey() + "preferredVolumes"; + + public PreferredVolumeChooser() {} + + @Override + public String choose(VolumeChooserEnvironment env, String[] options) { + if (!env.hasTableId()) + return super.choose(env, options); + + // Get the current table's properties, and find the preferred volumes property + TableConfiguration config = new ServerConfigurationFactory(HdfsZooInstance.getInstance()).getTableConfiguration(env.getTableId()); + PropertyFilter filter = new PropertyFilter() { + @Override + public boolean accept(String key) { + return PREFERRED_VOLUMES_CUSTOM_KEY.equals(key); + } + }; + Map<String,String> props = new HashMap<>(); + config.getProperties(props, filter); + if (props.isEmpty()) { + log.warn("No preferred volumes specified. Defaulting to randomly choosing from instance volumes"); + return super.choose(env, options); + } + String volumes = props.get(PREFERRED_VOLUMES_CUSTOM_KEY); + log.trace("In custom chooser"); + log.trace("Volumes: " + volumes); + log.trace("TableID: " + env.getTableId()); + + ArrayList<String> prefVol = new ArrayList<String>(); + // If the preferred volumes property is specified, split the returned string by the comma and add them to a preferred volumes list + prefVol.addAll(Arrays.asList(volumes.split(","))); + + // Change the given array to a List and only keep the preferred volumes that are in the given array. + prefVol.retainAll(Arrays.asList(options)); + + // If there are no preferred volumes left, then warn the user and choose randomly from the instance volumes + if (prefVol.isEmpty()) { + log.warn("Preferred volumes are not instance volumes. Defaulting to randomly choosing from instance volumes"); + return super.choose(env, options); + } + + // Randomly choose the volume from the preferred volumes + String choice = prefVol.get(random.nextInt(prefVol.size())); + log.trace("Choice = " + choice); + return choice; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef909d5f/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java index 85d4e2b..f2eb211 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java @@ -19,7 +19,7 @@ package org.apache.accumulo.server.fs; import java.util.Random; public class RandomVolumeChooser implements VolumeChooser { - private static Random random = new Random(); + protected static Random random = new Random(); @Override public String choose(VolumeChooserEnvironment env, String[] options) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef909d5f/test/src/test/java/org/apache/accumulo/test/VolumeChooserIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeChooserIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeChooserIT.java new file mode 100644 index 0000000..8ca141b --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/VolumeChooserIT.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.fs.PerTableVolumeChooser; +import org.apache.accumulo.server.fs.PreferredVolumeChooser; +import org.apache.accumulo.server.fs.RandomVolumeChooser; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +/** + * + */ +public class VolumeChooserIT extends ConfigurableMacIT { + + private static final Text EMPTY = new Text(); + private static final Value EMPTY_VALUE = new Value(new byte[] {}); + private File volDirBase; + private Path v1, v2, v3, v4; + private String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(","); + private String namespace1; + private String namespace2; + + @Override + protected int defaultTimeoutSeconds() { + return 30; + }; + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + // Get 2 tablet servers + cfg.setNumTservers(2); + namespace1 = "ns_" + getUniqueNames(2)[0]; + namespace2 = "ns_" + getUniqueNames(2)[1]; + + // Set the general volume chooser to the PerTableVolumeChooser so that different choosers can be specified + Map<String,String> siteConfig = new HashMap<String,String>(); + siteConfig.put(Property.GENERAL_VOLUME_CHOOSER.getKey(), PerTableVolumeChooser.class.getName()); + cfg.setSiteConfig(siteConfig); + + // Set up 4 different volume paths + File baseDir = cfg.getDir(); + volDirBase = new File(baseDir, "volumes"); + File v1f = new File(volDirBase, "v1"); + File v2f = new File(volDirBase, "v2"); + File v3f = new File(volDirBase, "v3"); + File v4f = new File(volDirBase, "v4"); + v1f.mkdir(); + v2f.mkdir(); + v4f.mkdir(); + v1 = new Path("file://" + v1f.getAbsolutePath()); + v2 = new Path("file://" + v2f.getAbsolutePath()); + v3 = new Path("file://" + v3f.getAbsolutePath()); + v4 = new Path("file://" + v4f.getAbsolutePath()); + + // Only add volumes 1, 2, and 4 to the list of instance volumes to have one volume that isn't in the options list when they are choosing + cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString() + "," + v4.toString()); + + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + + super.configure(cfg, hadoopCoreSite); + + } + + public void addSplits(Connector connector, String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + // Add 10 splits to the table + SortedSet<Text> partitions = new TreeSet<Text>(); + for (String s : "b,e,g,j,l,o,q,t,v,y".split(",")) + partitions.add(new Text(s)); + connector.tableOperations().addSplits(tableName, partitions); + } + + public void writeAndReadData(Connector connector, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + // Write some data to the table + BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig()); + for (String s : rows) { + Mutation m = new Mutation(new Text(s)); + m.put(EMPTY, EMPTY, EMPTY_VALUE); + bw.addMutation(m); + } + bw.close(); + + // Write the data to disk, read it back + connector.tableOperations().flush(tableName, null, null, true); + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + int i = 0; + for (Entry<Key,Value> entry : scanner) { + assertEquals("Data read is not data written", rows[i++], entry.getKey().getRow().toString()); + } + } + + public void verifyVolumes(Connector connector, String tableName, Range tableRange, String vol) throws TableNotFoundException { + // Verify the new files are written to the Volumes specified + ArrayList<String> volumes = new ArrayList<String>(); + for (String s : vol.split(",")) + volumes.add(s); + + Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(tableRange); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + int fileCount = 0; + for (Entry<Key,Value> entry : scanner) { + boolean inVolume = false; + for (String volume : volumes) { + if (entry.getKey().getColumnQualifier().toString().contains(volume)) + inVolume = true; + } + assertTrue("Data not written to the correct volumes", inVolume); + fileCount++; + } + assertEquals("Wrong number of files", 11, fileCount); + } + + // Test that uses two tables with 10 split points each. They each use the PreferredVolumeChooser to choose volumes. + @Test + public void twoTablesPreferredVolumeChooser() throws Exception { + log.info("Starting twoTablesPreferredVolumeChooser"); + + // Create namespace + Connector connector = getConnector(); + connector.namespaceOperations().create(namespace1); + + // Set properties on the namespace + String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey(); + String volume = PreferredVolumeChooser.class.getName(); + connector.namespaceOperations().setProperty(namespace1, propertyName, volume); + + propertyName = "table.custom.preferredVolumes"; + volume = v2.toString(); + connector.namespaceOperations().setProperty(namespace1, propertyName, volume); + + // Create table1 on namespace1 + String tableName = namespace1 + ".1"; + connector.tableOperations().create(tableName); + String tableID = connector.tableOperations().tableIdMap().get(tableName); + + // Add 10 splits to the table + addSplits(connector, tableName); + // Write some data to the table + writeAndReadData(connector, tableName); + // Verify the new files are written to the Volumes specified + verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), volume); + + connector.namespaceOperations().create(namespace2); + + // Set properties on the namespace + propertyName = Property.TABLE_VOLUME_CHOOSER.getKey(); + volume = PreferredVolumeChooser.class.getName(); + connector.namespaceOperations().setProperty(namespace2, propertyName, volume); + + propertyName = "table.custom.preferredVolumes"; + volume = v1.toString(); + connector.namespaceOperations().setProperty(namespace2, propertyName, volume); + + // Create table2 on namespace2 + String tableName2 = namespace2 + ".1"; + + connector.tableOperations().create(tableName2); + String tableID2 = connector.tableOperations().tableIdMap().get(tableName2); + + // Add 10 splits to the table + addSplits(connector, tableName2); + // Write some data to the table + writeAndReadData(connector, tableName2); + // Verify the new files are written to the Volumes specified + verifyVolumes(connector, tableName2, TabletsSection.getRange(tableID2), volume); + } + + // Test that uses two tables with 10 split points each. They each use the RandomVolumeChooser to choose volumes. + @Test + public void twoTablesRandomVolumeChooser() throws Exception { + log.info("Starting twoTablesRandomVolumeChooser()"); + + // Create namespace + Connector connector = getConnector(); + connector.namespaceOperations().create(namespace1); + + // Set properties on the namespace + String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey(); + String volume = RandomVolumeChooser.class.getName(); + connector.namespaceOperations().setProperty(namespace1, propertyName, volume); + + // Create table1 on namespace1 + String tableName = namespace1 + ".1"; + connector.tableOperations().create(tableName); + String tableID = connector.tableOperations().tableIdMap().get(tableName); + + // Add 10 splits to the table + addSplits(connector, tableName); + // Write some data to the table + writeAndReadData(connector, tableName); + // Verify the new files are written to the Volumes specified + + verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString()); + + connector.namespaceOperations().create(namespace2); + + // Set properties on the namespace + propertyName = Property.TABLE_VOLUME_CHOOSER.getKey(); + volume = RandomVolumeChooser.class.getName(); + connector.namespaceOperations().setProperty(namespace2, propertyName, volume); + + // Create table2 on namespace2 + String tableName2 = namespace2 + ".1"; + connector.tableOperations().create(tableName2); + String tableID2 = connector.tableOperations().tableIdMap().get(tableName); + + // / Add 10 splits to the table + addSplits(connector, tableName2); + // Write some data to the table + writeAndReadData(connector, tableName2); + // Verify the new files are written to the Volumes specified + verifyVolumes(connector, tableName2, TabletsSection.getRange(tableID2), v1.toString() + "," + v2.toString() + "," + v4.toString()); + } + + // Test that uses two tables with 10 split points each. The first uses the RandomVolumeChooser and the second uses the + // StaticVolumeChooser to choose volumes. + @Test + public void twoTablesDiffChoosers() throws Exception { + log.info("Starting twoTablesDiffChoosers"); + + // Create namespace + Connector connector = getConnector(); + connector.namespaceOperations().create(namespace1); + + // Set properties on the namespace + String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey(); + String volume = RandomVolumeChooser.class.getName(); + connector.namespaceOperations().setProperty(namespace1, propertyName, volume); + + // Create table1 on namespace1 + String tableName = namespace1 + ".1"; + connector.tableOperations().create(tableName); + String tableID = connector.tableOperations().tableIdMap().get(tableName); + + // Add 10 splits to the table + addSplits(connector, tableName); + // Write some data to the table + writeAndReadData(connector, tableName); + // Verify the new files are written to the Volumes specified + + verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString()); + + connector.namespaceOperations().create(namespace2); + + // Set properties on the namespace + propertyName = Property.TABLE_VOLUME_CHOOSER.getKey(); + volume = PreferredVolumeChooser.class.getName(); + connector.namespaceOperations().setProperty(namespace2, propertyName, volume); + + propertyName = "table.custom.preferredVolumes"; + volume = v1.toString(); + connector.namespaceOperations().setProperty(namespace2, propertyName, volume); + + // Create table2 on namespace2 + String tableName2 = namespace2 + ".1"; + connector.tableOperations().create(tableName2); + String tableID2 = connector.tableOperations().tableIdMap().get(tableName2); + + // Add 10 splits to the table + addSplits(connector, tableName2); + // Write some data to the table + writeAndReadData(connector, tableName2); + // Verify the new files are written to the Volumes specified + verifyVolumes(connector, tableName2, TabletsSection.getRange(tableID2), volume); + } + + // Test that uses one table with 10 split points each. It uses the StaticVolumeChooser, but no preferred volume is specified. This means that the volume + // is chosen randomly from all instance volumes. + @Test + public void missingVolumePreferredVolumeChooser() throws Exception { + log.info("Starting missingVolumePreferredVolumeChooser"); + + // Create namespace + Connector connector = getConnector(); + connector.namespaceOperations().create(namespace1); + + // Set properties on the namespace + String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey(); + String volume = PreferredVolumeChooser.class.getName(); + connector.namespaceOperations().setProperty(namespace1, propertyName, volume); + + // Create table1 on namespace1 + String tableName = namespace1 + ".1"; + connector.tableOperations().create(tableName); + String tableID = connector.tableOperations().tableIdMap().get(tableName); + + // Add 10 splits to the table + addSplits(connector, tableName); + // Write some data to the table + writeAndReadData(connector, tableName); + // Verify the new files are written to the Volumes specified + verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString()); + } + + // Test that uses one table with 10 split points each. It uses the PreferredVolumeChooser, but preferred volume is not an instance volume. This means that the + // volume is chosen randomly from all instance volumes + @Test + public void notInstancePreferredVolumeChooser() throws Exception { + log.info("Starting notInstancePreferredVolumeChooser"); + + // Create namespace + Connector connector = getConnector(); + connector.namespaceOperations().create(namespace1); + + // Set properties on the namespace + String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey(); + String volume = PreferredVolumeChooser.class.getName(); + connector.namespaceOperations().setProperty(namespace1, propertyName, volume); + + propertyName = "table.custom.preferredVolumes"; + volume = v3.toString(); + connector.namespaceOperations().setProperty(namespace1, propertyName, volume); + + // Create table1 on namespace1 + String tableName = namespace1 + ".1"; + connector.tableOperations().create(tableName); + String tableID = connector.tableOperations().tableIdMap().get(tableName); + + // Add 10 splits to the table + addSplits(connector, tableName); + // Write some data to the table + writeAndReadData(connector, tableName); + // Verify the new files are written to the Volumes specified + verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString()); + } + + // Test that uses one table with 10 split points each. It does not specify a specific chooser, so the volume is chosen randomly from all instance volumes. + @Test + public void chooserNotSpecified() throws Exception { + log.info("Starting chooserNotSpecified"); + + // Create a table + Connector connector = getConnector(); + String tableName = getUniqueNames(2)[0]; + connector.tableOperations().create(tableName); + String tableID = connector.tableOperations().tableIdMap().get(tableName); + + // Add 10 splits to the table + addSplits(connector, tableName); + // Write some data to the table + writeAndReadData(connector, tableName); + + // Verify the new files are written to the Volumes specified + verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString()); + } + +}
