http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java index 52688cb,0000000..7bc1a80 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java @@@ -1,71 -1,0 +1,67 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.compaction; + +import java.io.IOException; +import java.util.Map; + +/** + * The interface for customizing major compactions. + * <p> + * The tablet server has one thread to ask many tablets if they should compact. When the strategy returns true, then tablet is added to the queue of tablets + * waiting for a compaction thread. Once a thread is available, the {@link #gatherInformation(MajorCompactionRequest)} method is called outside the tablets' + * lock. This gives the strategy the ability to read information that maybe expensive to fetch. Once the gatherInformation returns, the tablet lock is grabbed + * and the compactionPlan computed. This should *not* do expensive operations, especially not I/O. Note that the number of files may change between calls to + * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)}. + * <p> + * <b>Note:</b> the strategy object used for the {@link #shouldCompact(MajorCompactionRequest)} call is going to be different from the one used in the + * compaction thread. + */ +public abstract class CompactionStrategy { + + /** + * The settings for the compaction strategy pulled from zookeeper. The <tt>table.compacations.major.strategy.opts</tt> part of the setting will be removed. - * - * @param options + */ + public void init(Map<String,String> options) {} + + /** + * Determine if this tablet is eligible for a major compaction. It's ok if it later determines (through {@link #gatherInformation(MajorCompactionRequest)} and + * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when + * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called. + * + */ + public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException; + + /** + * Called prior to obtaining the tablet lock, useful for examining metadata or indexes. State collected during this method will be available during the call + * the {@link #getCompactionPlan(MajorCompactionRequest)}. + * + * @param request + * basic details about the tablet - * @throws IOException + */ + public void gatherInformation(MajorCompactionRequest request) throws IOException {} + + /** + * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking. + * + * @param request + * basic details about the tablet + * @return the plan for a major compaction, or null to cancel the compaction. - * @throws IOException + */ + abstract public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException; + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java index 25b8043,0000000..a1229e7 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java @@@ -1,170 -1,0 +1,169 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.logger; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.tserver.log.DfsLogger; +import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams; +import org.apache.accumulo.tserver.log.MultiReader; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; + +public class LogReader { + + static class Opts extends Help { + @Parameter(names = "-r", description = "print only mutations associated with the given row") + String row; + @Parameter(names = "-m", description = "limit the number of mutations printed per row") + int maxMutations = 5; + @Parameter(names = "-t", description = "print only mutations that fall within the given key extent") + String extent; + @Parameter(names = "-p", description = "search for a row that matches the given regex") + String regexp; + @Parameter(description = "<logfile> { <logfile> ... }") + List<String> files = new ArrayList<String>(); + } + + /** + * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system. + * + * @param args + * - first argument is the file to print - * @throws IOException + */ + public static void main(String[] args) throws IOException { + Opts opts = new Opts(); + opts.parseArgs(LogReader.class.getName(), args); + VolumeManager fs = VolumeManagerImpl.get(); + + Matcher rowMatcher = null; + KeyExtent ke = null; + Text row = null; + if (opts.files.isEmpty()) { + new JCommander(opts).usage(); + return; + } + if (opts.row != null) + row = new Text(opts.row); + if (opts.extent != null) { + String sa[] = opts.extent.split(";"); + ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2])); + } + if (opts.regexp != null) { + Pattern pattern = Pattern.compile(opts.regexp); + rowMatcher = pattern.matcher(""); + } + + Set<Integer> tabletIds = new HashSet<Integer>(); + + for (String file : opts.files) { + + Path path = new Path(file); + LogFileKey key = new LogFileKey(); + LogFileValue value = new LogFileValue(); + + if (fs.isFile(path)) { + // read log entries from a simple hdfs file + DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration()); + DataInputStream input = streams.getDecryptingInputStream(); + + try { + while (true) { + try { + key.readFields(input); + value.readFields(input); + } catch (EOFException ex) { + break; + } + printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); + } + } finally { + input.close(); + } + } else { + // read the log entries sorted in a map file + MultiReader input = new MultiReader(fs, path); + while (input.next(key, value)) { + printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); + } + } + } + } + + public static void printLogEvent(LogFileKey key, LogFileValue value, Text row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int maxMutations) { + + if (ke != null) { + if (key.event == LogEvents.DEFINE_TABLET) { + if (key.tablet.equals(ke)) { + tabletIds.add(key.tid); + } else { + return; + } + } else if (!tabletIds.contains(key.tid)) { + return; + } + } + + if (row != null || rowMatcher != null) { + if (key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) { + boolean found = false; + for (Mutation m : value.mutations) { + if (row != null && new Text(m.getRow()).equals(row)) { + found = true; + break; + } + + if (rowMatcher != null) { + rowMatcher.reset(new String(m.getRow(), Constants.UTF8)); + if (rowMatcher.matches()) { + found = true; + break; + } + } + } + + if (!found) + return; + } else { + return; + } + + } + + System.out.println(key); + System.out.println(LogFileValue.format(value, maxMutations)); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java ---------------------------------------------------------------------- diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java index b7d5fa9,277c741..745401b --- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java +++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java @@@ -36,15 -30,16 +30,16 @@@ public class PostDelegatingVFSClassLoad super(files, manager, parent); } + @Override protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { Class<?> c = findLoadedClass(name); - if (c == null) { - try { - // try finding this class here instead of parent - findClass(name); - } catch (ClassNotFoundException e) { - - } + if (c != null) + return c; + try { + // try finding this class here instead of parent + return findClass(name); + } catch (ClassNotFoundException e) { + } return super.loadClass(name, resolve); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java ---------------------------------------------------------------------- diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java index c9fd2f5,104ea09..2973750 --- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java +++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java @@@ -42,76 -42,36 +42,71 @@@ import org.apache.hadoop.fs.Path */ public class HdfsFileSystem extends AbstractFileSystem { - private static final Log log = LogFactory.getLog(HdfsFileSystem.class); - - private FileSystem fs; - - protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions) + private static final Log log = LogFactory.getLog(HdfsFileSystem.class); + + private FileSystem fs; + - /** - * - * @param rootName - * @param fileSystemOptions - */ + protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions) + { + super(rootName, null, fileSystemOptions); + } + + /** + * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(java.util.Collection) + */ + @Override + protected void addCapabilities(final Collection<Capability> capabilities) + { + capabilities.addAll(HdfsFileProvider.CAPABILITIES); + } + + /** + * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close() + */ + @Override + synchronized public void close() + { + try { - super(rootName, null, fileSystemOptions); + if (null != fs) + { + fs.close(); + } } - - /** - * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(java.util.Collection) - */ - @Override - protected void addCapabilities(final Collection<Capability> capabilities) + catch (final IOException e) { - capabilities.addAll(HdfsFileProvider.CAPABILITIES); + throw new RuntimeException("Error closing HDFS client", e); } - - /** - * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close() - */ - @Override - public void close() + super.close(); + } + + /** + * @see org.apache.commons.vfs2.provider.AbstractFileSystem#createFile(org.apache.commons.vfs2.provider.AbstractFileName) + */ + @Override + protected FileObject createFile(final AbstractFileName name) throws Exception + { + throw new FileSystemException("Operation not supported"); + } + + /** + * @see org.apache.commons.vfs2.provider.AbstractFileSystem#resolveFile(org.apache.commons.vfs2.FileName) + */ + @Override + public FileObject resolveFile(final FileName name) throws FileSystemException + { + + synchronized (this) { + if (null == this.fs) + { + final String hdfsUri = name.getRootURI(); + final Configuration conf = new Configuration(true); + conf.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsUri); + this.fs = null; try { - if (null != fs) - { - fs.close(); - } + fs = org.apache.hadoop.fs.FileSystem.get(conf); } catch (final IOException e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java ----------------------------------------------------------------------