http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java new file mode 100644 index 0000000..9278cb2 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; + +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.tserver.MinorCompactionReason; +import org.apache.accumulo.tserver.compaction.MajorCompactionReason; + +class MinorCompactionTask implements Runnable { + + private final Tablet tablet; + private long queued; + private CommitSession commitSession; + private DataFileValue stats; + private FileRef mergeFile; + private long flushId; + private MinorCompactionReason mincReason; + + MinorCompactionTask(Tablet tablet, FileRef mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { + this.tablet = tablet; + queued = System.currentTimeMillis(); + tablet.minorCompactionWaitingToStart(); + this.commitSession = commitSession; + this.mergeFile = mergeFile; + this.flushId = flushId; + this.mincReason = mincReason; + } + + @Override + public void run() { + tablet.isMinorCompactionRunning(); + Span minorCompaction = Trace.on("minorCompaction"); + try { + FileRef newMapfileLocation = tablet.getNextMapFilename(mergeFile == null ? "F" : "M"); + FileRef tmpFileRef = new FileRef(newMapfileLocation.path() + "_tmp"); + Span span = Trace.start("waitForCommits"); + synchronized (tablet) { + commitSession.waitForCommitsToFinish(); + } + span.stop(); + span = Trace.start("start"); + while (true) { + try { + // the purpose of the minor compaction start event is to keep track of the filename... in the case + // where the metadata table write for the minor compaction finishes and the process dies before + // writing the minor compaction finish event, then the start event+filename in metadata table will + // prevent recovery of duplicate data... the minor compaction start event could be written at any time + // before the metadata write for the minor compaction + tablet.getTabletServer().minorCompactionStarted(commitSession, commitSession.getWALogSeq() + 1, newMapfileLocation.path().toString()); + break; + } catch (IOException e) { + Tablet.log.warn("Failed to write to write ahead log " + e.getMessage(), e); + } + } + span.stop(); + span = Trace.start("compact"); + this.stats = tablet.minorCompact(tablet.getTabletServer().getFileSystem(), tablet.getTabletMemory().getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile, true, queued, commitSession, flushId, + mincReason); + span.stop(); + + if (tablet.needsSplit()) { + tablet.getTabletServer().executeSplit(tablet); + } else { + tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL); + } + } catch (Throwable t) { + Tablet.log.error("Unknown error during minor compaction for extent: " + tablet.getExtent(), t); + throw new RuntimeException(t); + } finally { + tablet.minorCompactionComplete(); + minorCompaction.data("extent", tablet.getExtent().toString()); + minorCompaction.data("numEntries", Long.toString(this.stats.getNumEntries())); + minorCompaction.data("size", Long.toString(this.stats.getSize())); + minorCompaction.stop(); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java new file mode 100644 index 0000000..6636159 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Random; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.tserver.InMemoryMap; +import org.apache.accumulo.tserver.MinorCompactionReason; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +public class MinorCompactor extends Compactor { + + private static final Logger log = Logger.getLogger(MinorCompactor.class); + + private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap(); + + private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) { + if (mergeFile == null) + return EMPTY_MAP; + + return Collections.singletonMap(mergeFile, dfv); + } + + public MinorCompactor(VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf, + KeyExtent extent, MinorCompactionReason mincReason) { + super(fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() { + + @Override + public boolean isCompactionEnabled() { + return true; + } + + @Override + public IteratorScope getIteratorScope() { + return IteratorScope.minc; + } + }, Collections.<IteratorSetting>emptyList(), mincReason.ordinal()); + } + + private boolean isTableDeleting() { + try { + return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString()) == TableState.DELETING; + } catch (Exception e) { + log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ", e); + return false; // can not get positive confirmation that its deleting. + } + } + + @Override + public CompactionStats call() { + log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent()); + + // output to new MapFile with a temporary name + int sleepTime = 100; + double growthFactor = 4; + int maxSleepTime = 1000 * 60 * 3; // 3 minutes + boolean reportedProblem = false; + + runningCompactions.add(this); + try { + do { + try { + CompactionStats ret = super.call(); + + // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted, + // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes())); + + if (reportedProblem) { + ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile()); + } + + return ret; + } catch (IOException e) { + log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ..."); + ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); + reportedProblem = true; + } catch (RuntimeException e) { + // if this is coming from a user iterator, it is possible that the user could change the iterator config and that the + // minor compaction would succeed + log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...", e); + ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); + reportedProblem = true; + } catch (CompactionCanceledException e) { + throw new IllegalStateException(e); + } + + Random random = new Random(); + + int sleep = sleepTime + random.nextInt(sleepTime); + log.debug("MinC failed sleeping " + sleep + " ms before retrying"); + UtilWaitThread.sleep(sleep); + sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor)); + + // clean up + try { + if (getFileSystem().exists(new Path(getOutputFile()))) { + getFileSystem().deleteRecursively(new Path(getOutputFile())); + } + } catch (IOException e) { + log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage()); + } + + if (isTableDeleting()) + return new CompactionStats(0, 0); + + } while (true); + } finally { + thread = null; + runningCompactions.remove(this); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java new file mode 100644 index 0000000..450fffe --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Rate.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +public class Rate { + private long lastCounter = -1; + private long lastTime = -1; + private double current = 0.0; + final double ratio; + + /** + * Turn a counter into an exponentially smoothed rate over time. + * + * @param ratio + * the rate at which each update influences the curve; must be (0., 1.0) + */ + public Rate(double ratio) { + if (ratio <= 0. || ratio >= 1.0) + throw new IllegalArgumentException("ratio must be > 0. and < 1.0"); + this.ratio = ratio; + } + + public double update(long counter) { + return update(System.currentTimeMillis(), counter); + } + + synchronized public double update(long when, long counter) { + if (lastCounter < 0) { + lastTime = when; + lastCounter = counter; + return current; + } + if (lastTime == when) { + throw new IllegalArgumentException("update time < last value"); + } + double keep = 1. - ratio; + current = (keep * current + ratio * ((counter - lastCounter)) * 1000. / (when - lastTime)); + lastTime = when; + lastCounter = counter; + return current; + } + + synchronized public double rate() { + return this.current; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java new file mode 100644 index 0000000..3a8bb08 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/RootFiles.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +/** + * + */ +public class RootFiles { + + private static Logger log = Logger.getLogger(RootFiles.class); + + public static void prepareReplacement(VolumeManager fs, Path location, Set<FileRef> oldDatafiles, String compactName) throws IOException { + for (FileRef ref : oldDatafiles) { + Path path = ref.path(); + DatafileManager.rename(fs, path, new Path(location + "/delete+" + compactName + "+" + path.getName())); + } + } + + public static void renameReplacement(VolumeManager fs, FileRef tmpDatafile, FileRef newDatafile) throws IOException { + if (fs.exists(newDatafile.path())) { + log.error("Target map file already exist " + newDatafile, new Exception()); + throw new IllegalStateException("Target map file already exist " + newDatafile); + } + + DatafileManager.rename(fs, tmpDatafile.path(), newDatafile.path()); + } + + public static void finishReplacement(AccumuloConfiguration acuTableConf, VolumeManager fs, Path location, Set<FileRef> oldDatafiles, String compactName) + throws IOException { + // start deleting files, if we do not finish they will be cleaned + // up later + for (FileRef ref : oldDatafiles) { + Path path = ref.path(); + Path deleteFile = new Path(location + "/delete+" + compactName + "+" + path.getName()); + if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) || !fs.moveToTrash(deleteFile)) + fs.deleteRecursively(deleteFile); + } + } + + public static void replaceFiles(AccumuloConfiguration acuTableConf, VolumeManager fs, Path location, Set<FileRef> oldDatafiles, FileRef tmpDatafile, + FileRef newDatafile) throws IOException { + String compactName = newDatafile.path().getName(); + + prepareReplacement(fs, location, oldDatafiles, compactName); + renameReplacement(fs, tmpDatafile, newDatafile); + finishReplacement(acuTableConf, fs, location, oldDatafiles, compactName); + } + + public static Collection<String> cleanupReplacement(VolumeManager fs, FileStatus[] files, boolean deleteTmp) throws IOException { + /* + * called in constructor and before major compactions + */ + Collection<String> goodFiles = new ArrayList<String>(files.length); + + for (FileStatus file : files) { + + String path = file.getPath().toString(); + if (file.getPath().toUri().getScheme() == null) { + // depending on the behavior of HDFS, if list status does not return fully qualified volumes then could switch to the default volume + throw new IllegalArgumentException("Require fully qualified paths " + file.getPath()); + } + + String filename = file.getPath().getName(); + + // check for incomplete major compaction, this should only occur + // for root tablet + if (filename.startsWith("delete+")) { + String expectedCompactedFile = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1]; + if (fs.exists(new Path(expectedCompactedFile))) { + // compaction finished, but did not finish deleting compacted files.. so delete it + if (!fs.deleteRecursively(file.getPath())) + log.warn("Delete of file: " + file.getPath().toString() + " return false"); + continue; + } + // compaction did not finish, so put files back + + // reset path and filename for rest of loop + filename = filename.split("\\+", 3)[2]; + path = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename; + + DatafileManager.rename(fs, file.getPath(), new Path(path)); + } + + if (filename.endsWith("_tmp")) { + if (deleteTmp) { + log.warn("cleaning up old tmp file: " + path); + if (!fs.deleteRecursively(file.getPath())) + log.warn("Delete of tmp file: " + file.getPath().toString() + " return false"); + + } + continue; + } + + if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) { + log.error("unknown file in tablet" + path); + continue; + } + + goodFiles.add(path); + } + + return goodFiles; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java new file mode 100644 index 0000000..0ea76d3 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanBatch.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.util.List; + +public class ScanBatch { + public final boolean more; + public final List<KVEntry> results; + + ScanBatch(List<KVEntry> results, boolean more) { + this.results = results; + this.more = more; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java new file mode 100644 index 0000000..980a082 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.iterators.IterationInterruptedException; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter; +import org.apache.accumulo.core.iterators.system.DeletingIterator; +import org.apache.accumulo.core.iterators.system.InterruptibleIterator; +import org.apache.accumulo.core.iterators.system.MultiIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; +import org.apache.accumulo.core.iterators.system.StatsIterator; +import org.apache.accumulo.core.iterators.system.VisibilityFilter; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.tserver.FileManager.ScanFileManager; +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; +import org.apache.accumulo.tserver.TabletIteratorEnvironment; +import org.apache.accumulo.tserver.TabletServer; + +class ScanDataSource implements DataSource { + + // data source state + private final Tablet tablet; + private ScanFileManager fileManager; + private SortedKeyValueIterator<Key,Value> iter; + private long expectedDeletionCount; + private List<MemoryIterator> memIters = null; + private long fileReservationId; + private AtomicBoolean interruptFlag; + private StatsIterator statsIterator; + + private final ScanOptions options; + + ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, + AtomicBoolean interruptFlag) { + this.tablet = tablet; + expectedDeletionCount = tablet.getDataSourceDeletions(); + this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false); + this.interruptFlag = interruptFlag; + } + + ScanDataSource(Tablet tablet, ScanOptions options) { + this.tablet = tablet; + expectedDeletionCount = tablet.getDataSourceDeletions(); + this.options = options; + this.interruptFlag = options.interruptFlag; + } + + @Override + public DataSource getNewDataSource() { + if (!isCurrent()) { + // log.debug("Switching data sources during a scan"); + if (memIters != null) { + tablet.getTabletMemory().returnIterators(memIters); + memIters = null; + tablet.getDatafileManager().returnFilesForScan(fileReservationId); + fileReservationId = -1; + } + + if (fileManager != null) + fileManager.releaseOpenFiles(false); + + expectedDeletionCount = tablet.getDataSourceDeletions(); + iter = null; + + return this; + } else + return this; + } + + @Override + public boolean isCurrent() { + return expectedDeletionCount == tablet.getDataSourceDeletions(); + } + + @Override + public SortedKeyValueIterator<Key,Value> iterator() throws IOException { + if (iter == null) + iter = createIterator(); + return iter; + } + + private SortedKeyValueIterator<Key,Value> createIterator() throws IOException { + + Map<FileRef,DataFileValue> files; + + synchronized (tablet) { + + if (memIters != null) + throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory"); + + if (tablet.isClosed()) + throw new TabletClosedException(); + + if (interruptFlag.get()) + throw new IterationInterruptedException(tablet.getExtent().toString() + " " + interruptFlag.hashCode()); + + // only acquire the file manager when we know the tablet is open + if (fileManager == null) { + fileManager = tablet.getTabletResources().newScanFileManager(); + tablet.addActiveScans(this); + } + + if (fileManager.getNumOpenFiles() != 0) + throw new IllegalStateException("Tried to create new scan iterator w/o releasing files"); + + // set this before trying to get iterators in case + // getIterators() throws an exception + expectedDeletionCount = tablet.getDataSourceDeletions(); + + memIters = tablet.getTabletMemory().getIterators(); + Pair<Long,Map<FileRef,DataFileValue>> reservation = tablet.getDatafileManager().reserveFilesForScan(); + fileReservationId = reservation.getFirst(); + files = reservation.getSecond(); + } + + Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isolated); + + List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size()); + + iters.addAll(mapfiles); + iters.addAll(memIters); + + for (SortedKeyValueIterator<Key,Value> skvi : iters) + ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag); + + MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent()); + + TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, tablet.getTableConfiguration(), fileManager, files); + + statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter()); + + DeletingIterator delIter = new DeletingIterator(statsIterator, false); + + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); + + ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet); + + VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels); + + return iterEnv.getTopLevelIterator(IteratorUtil + .loadIterators(IteratorScope.scan, visFilter, tablet.getExtent(), tablet.getTableConfiguration(), options.ssiList, options.ssio, iterEnv)); + } + + void close(boolean sawErrors) { + + if (memIters != null) { + tablet.getTabletMemory().returnIterators(memIters); + memIters = null; + tablet.getDatafileManager().returnFilesForScan(fileReservationId); + fileReservationId = -1; + } + + synchronized (tablet) { + if (tablet.removeScan(this) == 0) + tablet.notifyAll(); + } + + if (fileManager != null) { + fileManager.releaseOpenFiles(sawErrors); + fileManager = null; + } + + if (statsIterator != null) { + statsIterator.report(); + } + + } + + public void interrupt() { + interruptFlag.set(true); + } + + @Override + public DataSource getDeepCopyDataSource(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + public void reattachFileManager() throws IOException { + if (fileManager != null) + fileManager.reattach(); + } + + public void detachFileManager() { + if (fileManager != null) + fileManager.detach(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java new file mode 100644 index 0000000..9382ea7 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.security.Authorizations; + +class ScanOptions { + + final Authorizations authorizations; + final byte[] defaultLabels; + final Set<Column> columnSet; + final List<IterInfo> ssiList; + final Map<String,Map<String,String>> ssio; + final AtomicBoolean interruptFlag; + final int num; + final boolean isolated; + + ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList, + Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) { + this.num = num; + this.authorizations = authorizations; + this.defaultLabels = defaultLabels; + this.columnSet = columnSet; + this.ssiList = ssiList; + this.ssio = ssio; + this.interruptFlag = interruptFlag; + this.isolated = isolated; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java new file mode 100644 index 0000000..96379fc --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IterationInterruptedException; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; +import org.apache.log4j.Logger; + +public class Scanner { + private static final Logger log = Logger.getLogger(Scanner.class); + + private final Tablet tablet; + private final ScanOptions options; + private Range range; + private SortedKeyValueIterator<Key,Value> isolatedIter; + private ScanDataSource isolatedDataSource; + private boolean sawException = false; + private boolean scanClosed = false; + + Scanner(Tablet tablet, Range range, ScanOptions options) { + this.tablet = tablet; + this.range = range; + this.options = options; + } + + public synchronized ScanBatch read() throws IOException, TabletClosedException { + + if (sawException) + throw new IllegalStateException("Tried to use scanner after exception occurred."); + + if (scanClosed) + throw new IllegalStateException("Tried to use scanner after it was closed."); + + Batch results = null; + + ScanDataSource dataSource; + + if (options.isolated) { + if (isolatedDataSource == null) + isolatedDataSource = new ScanDataSource(tablet, options); + dataSource = isolatedDataSource; + } else { + dataSource = new ScanDataSource(tablet, options); + } + + try { + + SortedKeyValueIterator<Key,Value> iter; + + if (options.isolated) { + if (isolatedIter == null) + isolatedIter = new SourceSwitchingIterator(dataSource, true); + else + isolatedDataSource.reattachFileManager(); + iter = isolatedIter; + } else { + iter = new SourceSwitchingIterator(dataSource, false); + } + + results = tablet.nextBatch(iter, range, options.num, options.columnSet); + + if (results.results == null) { + range = null; + return new ScanBatch(new ArrayList<KVEntry>(), false); + } else if (results.continueKey == null) { + return new ScanBatch(results.results, false); + } else { + range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive()); + return new ScanBatch(results.results, true); + } + + } catch (IterationInterruptedException iie) { + sawException = true; + if (tablet.isClosed()) + throw new TabletClosedException(iie); + else + throw iie; + } catch (IOException ioe) { + if (tablet.shutdownInProgress()) { + log.debug("IOException while shutdown in progress ", ioe); + throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook + } + + sawException = true; + dataSource.close(true); + throw ioe; + } catch (RuntimeException re) { + sawException = true; + throw re; + } finally { + // code in finally block because always want + // to return mapfiles, even when exception is thrown + if (!options.isolated) + dataSource.close(false); + else + dataSource.detachFileManager(); + + if (results != null && results.results != null) + tablet.updateQueryStats(results.results.size(), results.numBytes); + } + } + + // close and read are synchronized because can not call close on the data source while it is in use + // this could lead to the case where file iterators that are in use by a thread are returned + // to the pool... this would be bad + public void close() { + options.interruptFlag.set(true); + synchronized (this) { + scanClosed = true; + if (isolatedDataSource != null) + isolatedDataSource.close(false); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java new file mode 100644 index 0000000..084503a --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.util.SortedMap; + +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.master.state.TServerInstance; + +/** + * operations are disallowed while we split which is ok since splitting is fast + * + * a minor compaction should have taken place before calling this so there should be relatively little left to compact + * + * we just need to make sure major compactions aren't occurring if we have the major compactor thread decide who needs splitting we can avoid synchronization + * issues with major compactions + * + */ + +public class SplitInfo { + final String dir; + final SortedMap<FileRef,DataFileValue> datafiles; + final String time; + final long initFlushID; + final long initCompactID; + final TServerInstance lastLocation; + + SplitInfo(String d, SortedMap<FileRef,DataFileValue> dfv, String time, long initFlushID, long initCompactID, TServerInstance lastLocation) { + this.dir = d; + this.datafiles = dfv; + this.time = time; + this.initFlushID = initFlushID; + this.initCompactID = initCompactID; + this.lastLocation = lastLocation; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java new file mode 100644 index 0000000..75cf91e --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitRowSpec.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import org.apache.hadoop.io.Text; + +class SplitRowSpec { + final double splitRatio; + final Text row; + + SplitRowSpec(double splitRatio, Text row) { + this.splitRatio = splitRatio; + this.row = row; + } +} \ No newline at end of file