ACCUMULO-2041 extract tablet classes to new files, move tablet-related code to o.a.a.tserver.tablet, make member variables private
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7db2abf1 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7db2abf1 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7db2abf1 Branch: refs/heads/ACCUMULO-378 Commit: 7db2abf19c2e0585b2f3ea32068c3d62bd891590 Parents: 9e770ca Author: Eric C. Newton <eric.new...@gmail.com> Authored: Mon Apr 21 13:12:13 2014 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Tue Jun 3 10:49:43 2014 -0400 ---------------------------------------------------------------------- .../accumulo/tserver/CompactionStats.java | 59 - .../accumulo/tserver/CompactionWatcher.java | 110 - .../org/apache/accumulo/tserver/Compactor.java | 548 --- .../apache/accumulo/tserver/FileManager.java | 12 +- .../apache/accumulo/tserver/InMemoryMap.java | 2 +- .../accumulo/tserver/MinorCompactionReason.java | 21 + .../apache/accumulo/tserver/MinorCompactor.java | 146 - .../java/org/apache/accumulo/tserver/Rate.java | 60 - .../org/apache/accumulo/tserver/RootFiles.java | 133 - .../tserver/TConstraintViolationException.java | 54 + .../org/apache/accumulo/tserver/Tablet.java | 3810 ------------------ .../tserver/TabletIteratorEnvironment.java | 8 +- .../apache/accumulo/tserver/TabletServer.java | 79 +- .../tserver/TabletServerResourceManager.java | 67 +- .../accumulo/tserver/TabletStatsKeeper.java | 5 + .../tserver/log/TabletServerLogger.java | 4 +- .../apache/accumulo/tserver/tablet/Batch.java | 35 + .../accumulo/tserver/tablet/CommitSession.java | 121 + .../accumulo/tserver/tablet/CompactionInfo.java | 113 + .../tserver/tablet/CompactionRunner.java | 76 + .../tserver/tablet/CompactionStats.java | 59 + .../tserver/tablet/CompactionWatcher.java | 109 + .../accumulo/tserver/tablet/Compactor.java | 477 +++ .../tserver/tablet/DatafileManager.java | 581 +++ .../apache/accumulo/tserver/tablet/KVEntry.java | 39 + .../tserver/tablet/MinorCompactionTask.java | 96 + .../accumulo/tserver/tablet/MinorCompactor.java | 145 + .../apache/accumulo/tserver/tablet/Rate.java | 60 + .../accumulo/tserver/tablet/RootFiles.java | 133 + .../accumulo/tserver/tablet/ScanBatch.java | 29 + .../accumulo/tserver/tablet/ScanDataSource.java | 222 + .../accumulo/tserver/tablet/ScanOptions.java | 51 + .../apache/accumulo/tserver/tablet/Scanner.java | 135 + .../accumulo/tserver/tablet/SplitInfo.java | 52 + .../accumulo/tserver/tablet/SplitRowSpec.java | 29 + .../apache/accumulo/tserver/tablet/Tablet.java | 2564 ++++++++++++ .../tserver/tablet/TabletClosedException.java | 29 + .../tserver/tablet/TabletCommitter.java | 48 + .../accumulo/tserver/tablet/TabletMemory.java | 190 + .../accumulo/tserver/CountingIteratorTest.java | 2 +- .../apache/accumulo/tserver/RootFilesTest.java | 1 + 41 files changed, 5561 insertions(+), 4953 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java deleted file mode 100644 index d359e95..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionStats.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver; - -public class CompactionStats { - private long entriesRead; - private long entriesWritten; - private long fileSize; - - CompactionStats(long er, long ew) { - this.setEntriesRead(er); - this.setEntriesWritten(ew); - } - - public CompactionStats() {} - - private void setEntriesRead(long entriesRead) { - this.entriesRead = entriesRead; - } - - public long getEntriesRead() { - return entriesRead; - } - - private void setEntriesWritten(long entriesWritten) { - this.entriesWritten = entriesWritten; - } - - public long getEntriesWritten() { - return entriesWritten; - } - - public void add(CompactionStats mcs) { - this.entriesRead += mcs.entriesRead; - this.entriesWritten += mcs.entriesWritten; - } - - public void setFileSize(long fileSize) { - this.fileSize = fileSize; - } - - public long getFileSize() { - return this.fileSize; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java deleted file mode 100644 index 2e4d7b7..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.server.util.time.SimpleTimer; -import org.apache.accumulo.tserver.Compactor.CompactionInfo; -import org.apache.log4j.Logger; - -/** - * - */ -public class CompactionWatcher implements Runnable { - private Map<List<Long>,ObservedCompactionInfo> observedCompactions = new HashMap<List<Long>,ObservedCompactionInfo>(); - private AccumuloConfiguration config; - private static boolean watching = false; - - private static class ObservedCompactionInfo { - CompactionInfo compactionInfo; - long firstSeen; - boolean loggedWarning; - - ObservedCompactionInfo(CompactionInfo ci, long time) { - this.compactionInfo = ci; - this.firstSeen = time; - } - } - - public CompactionWatcher(AccumuloConfiguration config) { - this.config = config; - } - - public void run() { - List<CompactionInfo> runningCompactions = Compactor.getRunningCompactions(); - - Set<List<Long>> newKeys = new HashSet<List<Long>>(); - - long time = System.currentTimeMillis(); - - for (CompactionInfo ci : runningCompactions) { - List<Long> compactionKey = Arrays.asList(ci.getID(), ci.getEntriesRead(), ci.getEntriesWritten()); - newKeys.add(compactionKey); - - if (!observedCompactions.containsKey(compactionKey)) { - observedCompactions.put(compactionKey, new ObservedCompactionInfo(ci, time)); - } - } - - // look for compactions that finished or made progress and logged a warning - HashMap<List<Long>,ObservedCompactionInfo> copy = new HashMap<List<Long>,ObservedCompactionInfo>(observedCompactions); - copy.keySet().removeAll(newKeys); - - for (ObservedCompactionInfo oci : copy.values()) { - if (oci.loggedWarning) { - Logger.getLogger(CompactionWatcher.class).info("Compaction of " + oci.compactionInfo.getExtent() + " is no longer stuck"); - } - } - - // remove any compaction that completed or made progress - observedCompactions.keySet().retainAll(newKeys); - - long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME); - - // check for stuck compactions - for (ObservedCompactionInfo oci : observedCompactions.values()) { - if (time - oci.firstSeen > warnTime && !oci.loggedWarning) { - Thread compactionThread = oci.compactionInfo.getThread(); - if (compactionThread != null) { - StackTraceElement[] trace = compactionThread.getStackTrace(); - Exception e = new Exception("Possible stack trace of compaction stuck on " + oci.compactionInfo.getExtent()); - e.setStackTrace(trace); - Logger.getLogger(CompactionWatcher.class).warn( - "Compaction of " + oci.compactionInfo.getExtent() + " to " + oci.compactionInfo.getOutputFile() + " has not made progress for at least " - + (time - oci.firstSeen) + "ms", e); - oci.loggedWarning = true; - } - } - } - } - - public static synchronized void startWatching(AccumuloConfiguration config) { - if (!watching) { - SimpleTimer.getInstance(config).schedule(new CompactionWatcher(config), 10000, 10000); - watching = true; - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java deleted file mode 100644 index 822171c..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java +++ /dev/null @@ -1,548 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver; - -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.thrift.IterInfo; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.IteratorUtil; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.WrappingIterator; -import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; -import org.apache.accumulo.core.iterators.system.DeletingIterator; -import org.apache.accumulo.core.iterators.system.MultiIterator; -import org.apache.accumulo.core.iterators.system.TimeSettingIterator; -import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; -import org.apache.accumulo.core.tabletserver.thrift.CompactionReason; -import org.apache.accumulo.core.tabletserver.thrift.CompactionType; -import org.apache.accumulo.core.util.LocalityGroupUtil; -import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; -import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.problems.ProblemReport; -import org.apache.accumulo.server.problems.ProblemReportingIterator; -import org.apache.accumulo.server.problems.ProblemReports; -import org.apache.accumulo.server.problems.ProblemType; -import org.apache.accumulo.trace.instrument.Span; -import org.apache.accumulo.trace.instrument.Trace; -import org.apache.accumulo.tserver.Tablet.MinorCompactionReason; -import org.apache.accumulo.tserver.compaction.MajorCompactionReason; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; - -public class Compactor implements Callable<CompactionStats> { - - public static class CountingIterator extends WrappingIterator { - - private long count; - private ArrayList<CountingIterator> deepCopies; - private AtomicLong entriesRead; - - @Override - public CountingIterator deepCopy(IteratorEnvironment env) { - return new CountingIterator(this, env); - } - - private CountingIterator(CountingIterator other, IteratorEnvironment env) { - setSource(other.getSource().deepCopy(env)); - count = 0; - this.deepCopies = other.deepCopies; - this.entriesRead = other.entriesRead; - deepCopies.add(this); - } - - public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) { - deepCopies = new ArrayList<Compactor.CountingIterator>(); - this.setSource(source); - count = 0; - this.entriesRead = entriesRead; - } - - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) { - throw new UnsupportedOperationException(); - } - - @Override - public void next() throws IOException { - super.next(); - count++; - if (count % 1024 == 0) { - entriesRead.addAndGet(1024); - } - } - - public long getCount() { - long sum = 0; - for (CountingIterator dc : deepCopies) { - sum += dc.count; - } - - return count + sum; - } - } - - private static final Logger log = Logger.getLogger(Compactor.class); - - static class CompactionCanceledException extends Exception { - private static final long serialVersionUID = 1L; - } - - interface CompactionEnv { - boolean isCompactionEnabled(); - - IteratorScope getIteratorScope(); - } - - private Map<FileRef,DataFileValue> filesToCompact; - private InMemoryMap imm; - private FileRef outputFile; - private boolean propogateDeletes; - private AccumuloConfiguration acuTableConf; - private CompactionEnv env; - private Configuration conf; - private VolumeManager fs; - protected KeyExtent extent; - private List<IteratorSetting> iterators; - - // things to report - private String currentLocalityGroup = ""; - private long startTime; - - private MajorCompactionReason reason; - protected MinorCompactionReason mincReason; - - private AtomicLong entriesRead = new AtomicLong(0); - private AtomicLong entriesWritten = new AtomicLong(0); - private DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); - - private static AtomicLong nextCompactorID = new AtomicLong(0); - - // a unique id to identify a compactor - private long compactorID = nextCompactorID.getAndIncrement(); - - protected volatile Thread thread; - - private synchronized void setLocalityGroup(String name) { - this.currentLocalityGroup = name; - } - - private void clearStats() { - entriesRead.set(0); - entriesWritten.set(0); - } - - protected static final Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>()); - - public static class CompactionInfo { - - private Compactor compactor; - private String localityGroup; - private long entriesRead; - private long entriesWritten; - - CompactionInfo(Compactor compactor) { - this.localityGroup = compactor.currentLocalityGroup; - this.entriesRead = compactor.entriesRead.get(); - this.entriesWritten = compactor.entriesWritten.get(); - this.compactor = compactor; - } - - public long getID() { - return compactor.compactorID; - } - - public KeyExtent getExtent() { - return compactor.getExtent(); - } - - public long getEntriesRead() { - return entriesRead; - } - - public long getEntriesWritten() { - return entriesWritten; - } - - public Thread getThread() { - return compactor.thread; - } - - public String getOutputFile() { - return compactor.getOutputFile(); - } - - public ActiveCompaction toThrift() { - - CompactionType type; - - if (compactor.imm != null) - if (compactor.filesToCompact.size() > 0) - type = CompactionType.MERGE; - else - type = CompactionType.MINOR; - else if (!compactor.propogateDeletes) - type = CompactionType.FULL; - else - type = CompactionType.MAJOR; - - CompactionReason reason; - - if (compactor.imm != null) - switch (compactor.mincReason) { - case USER: - reason = CompactionReason.USER; - break; - case CLOSE: - reason = CompactionReason.CLOSE; - break; - case SYSTEM: - default: - reason = CompactionReason.SYSTEM; - break; - } - else - switch (compactor.reason) { - case USER: - reason = CompactionReason.USER; - break; - case CHOP: - reason = CompactionReason.CHOP; - break; - case IDLE: - reason = CompactionReason.IDLE; - break; - case NORMAL: - default: - reason = CompactionReason.SYSTEM; - break; - } - - List<IterInfo> iiList = new ArrayList<IterInfo>(); - Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>(); - - for (IteratorSetting iterSetting : compactor.iterators) { - iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName())); - iterOptions.put(iterSetting.getName(), iterSetting.getOptions()); - } - List<String> filesToCompact = new ArrayList<String>(); - for (FileRef ref : compactor.filesToCompact.keySet()) - filesToCompact.add(ref.toString()); - return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact, - compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions); - } - } - - public static List<CompactionInfo> getRunningCompactions() { - ArrayList<CompactionInfo> compactions = new ArrayList<Compactor.CompactionInfo>(); - - synchronized (runningCompactions) { - for (Compactor compactor : runningCompactions) { - compactions.add(new CompactionInfo(compactor)); - } - } - - return compactions; - } - - Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes, - AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) { - this.extent = extent; - this.conf = conf; - this.fs = fs; - this.filesToCompact = files; - this.imm = imm; - this.outputFile = outputFile; - this.propogateDeletes = propogateDeletes; - this.acuTableConf = acuTableConf; - this.env = env; - this.iterators = iterators; - this.reason = reason; - - startTime = System.currentTimeMillis(); - } - - Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes, - AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) { - this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null); - } - - public VolumeManager getFileSystem() { - return fs; - } - - KeyExtent getExtent() { - return extent; - } - - String getOutputFile() { - return outputFile.toString(); - } - - @Override - public CompactionStats call() throws IOException, CompactionCanceledException { - - FileSKVWriter mfw = null; - - CompactionStats majCStats = new CompactionStats(); - - boolean remove = runningCompactions.add(this); - - clearStats(); - - String oldThreadName = Thread.currentThread().getName(); - String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile; - Thread.currentThread().setName(newThreadName); - thread = Thread.currentThread(); - try { - FileOperations fileFactory = FileOperations.getInstance(); - FileSystem ns = this.fs.getVolumeByPath(outputFile.path()).getFileSystem(); - mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf); - - Map<String,Set<ByteSequence>> lGroups; - try { - lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf); - } catch (LocalityGroupConfigurationError e) { - throw new IOException(e); - } - - long t1 = System.currentTimeMillis(); - - HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>(); - - if (mfw.supportsLocalityGroups()) { - for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) { - setLocalityGroup(entry.getKey()); - compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats); - allColumnFamilies.addAll(entry.getValue()); - } - } - - setLocalityGroup(""); - compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats); - - long t2 = System.currentTimeMillis(); - - FileSKVWriter mfwTmp = mfw; - mfw = null; // set this to null so we do not try to close it again in finally if the close fails - mfwTmp.close(); // if the close fails it will cause the compaction to fail - - // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close() - try { - FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf); - openReader.close(); - } catch (IOException ex) { - log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex); - throw ex; - } - - log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(), - majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0)); - - majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf)); - return majCStats; - } catch (IOException e) { - log.error(e, e); - throw e; - } catch (RuntimeException e) { - log.error(e, e); - throw e; - } finally { - Thread.currentThread().setName(oldThreadName); - if (remove) { - thread = null; - runningCompactions.remove(this); - } - - try { - if (mfw != null) { - // compaction must not have finished successfully, so close its output file - try { - mfw.close(); - } finally { - if (!fs.deleteRecursively(outputFile.path())) - if (fs.exists(outputFile.path())) - log.error("Unable to delete " + outputFile); - } - } - } catch (IOException e) { - log.warn(e, e); - } catch (RuntimeException exception) { - log.warn(exception, exception); - } - } - } - - private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException { - - List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size()); - - for (FileRef mapFile : filesToCompact.keySet()) { - try { - - FileOperations fileFactory = FileOperations.getInstance(); - FileSystem fs = this.fs.getVolumeByPath(mapFile.path()).getFileSystem(); - FileSKVIterator reader; - - reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf); - - readers.add(reader); - - SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader); - - if (filesToCompact.get(mapFile).isTimeSet()) { - iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime()); - } - - iters.add(iter); - - } catch (Throwable e) { - - ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e)); - - log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e); - // failed to open some map file... close the ones that were opened - for (FileSKVIterator reader : readers) { - try { - reader.close(); - } catch (Throwable e2) { - log.warn("Failed to close map file", e2); - } - } - - readers.clear(); - - if (e instanceof IOException) - throw (IOException) e; - throw new IOException("Failed to open map data files", e); - } - } - - return iters; - } - - private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats) - throws IOException, CompactionCanceledException { - ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size()); - Span span = Trace.start("compact"); - try { - long entriesCompacted = 0; - List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers); - - if (imm != null) { - iters.add(imm.compactionIterator()); - } - - CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead); - DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes); - ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); - - // if(env.getIteratorScope() ) - - TabletIteratorEnvironment iterEnv; - if (env.getIteratorScope() == IteratorScope.majc) - iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf); - else if (env.getIteratorScope() == IteratorScope.minc) - iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf); - else - throw new IllegalArgumentException(); - - SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf, - iterators, iterEnv)); - - itr.seek(extent.toDataRange(), columnFamilies, inclusive); - - if (!inclusive) { - mfw.startDefaultLocalityGroup(); - } else { - mfw.startNewLocalityGroup(lgName, columnFamilies); - } - - Span write = Trace.start("write"); - try { - while (itr.hasTop() && env.isCompactionEnabled()) { - mfw.append(itr.getTopKey(), itr.getTopValue()); - itr.next(); - entriesCompacted++; - - if (entriesCompacted % 1024 == 0) { - // Periodically update stats, do not want to do this too often since its volatile - entriesWritten.addAndGet(1024); - } - } - - if (itr.hasTop() && !env.isCompactionEnabled()) { - // cancel major compaction operation - try { - try { - mfw.close(); - } catch (IOException e) { - log.error(e, e); - } - fs.deleteRecursively(outputFile.path()); - } catch (Exception e) { - log.warn("Failed to delete Canceled compaction output file " + outputFile, e); - } - throw new CompactionCanceledException(); - } - - } finally { - CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted); - majCStats.add(lgMajcStats); - write.stop(); - } - - } finally { - // close sequence files opened - for (FileSKVIterator reader : readers) { - try { - reader.close(); - } catch (Throwable e) { - log.warn("Failed to close map file", e); - } - } - span.stop(); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java index e8958b1..017398e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java @@ -161,7 +161,7 @@ public class FileManager { * @param indexCache * : underlying file can and should be able to handle a null cache */ - FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) { + public FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) { if (maxOpen <= 0) throw new IllegalArgumentException("maxOpen <= 0"); @@ -481,7 +481,7 @@ public class FileManager { return newlyReservedReaders; } - synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException { + public synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException { List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet()); @@ -509,7 +509,7 @@ public class FileManager { return iters; } - synchronized void detach() { + public synchronized void detach() { releaseReaders(tabletReservedReaders, false); tabletReservedReaders.clear(); @@ -518,7 +518,7 @@ public class FileManager { fds.unsetIterator(); } - synchronized void reattach() throws IOException { + public synchronized void reattach() throws IOException { if (tabletReservedReaders.size() != 0) throw new IllegalStateException(); @@ -545,13 +545,13 @@ public class FileManager { } } - synchronized void releaseOpenFiles(boolean sawIOException) { + public synchronized void releaseOpenFiles(boolean sawIOException) { releaseReaders(tabletReservedReaders, sawIOException); tabletReservedReaders.clear(); dataSources.clear(); } - synchronized int getNumOpenFiles() { + public synchronized int getNumOpenFiles() { return tabletReservedReaders.size(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index dc36718..3c9c32c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -567,7 +567,7 @@ public class InMemoryMap { } - class MemoryIterator extends WrappingIterator implements InterruptibleIterator { + public class MemoryIterator extends WrappingIterator implements InterruptibleIterator { private AtomicBoolean closed; private SourceSwitchingIterator ssi; http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java new file mode 100644 index 0000000..25cfd9b --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +public enum MinorCompactionReason { + USER, SYSTEM, CLOSE +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java deleted file mode 100644 index b2e84e5..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Random; - -import org.apache.accumulo.core.client.impl.Tables; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.server.client.HdfsZooInstance; -import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.problems.ProblemReport; -import org.apache.accumulo.server.problems.ProblemReports; -import org.apache.accumulo.server.problems.ProblemType; -import org.apache.accumulo.tserver.Tablet.MinorCompactionReason; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; - -public class MinorCompactor extends Compactor { - - private static final Logger log = Logger.getLogger(MinorCompactor.class); - - private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap(); - - private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) { - if (mergeFile == null) - return EMPTY_MAP; - - return Collections.singletonMap(mergeFile, dfv); - } - - MinorCompactor(Configuration conf, VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf, - KeyExtent extent, MinorCompactionReason mincReason) { - super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() { - - @Override - public boolean isCompactionEnabled() { - return true; - } - - @Override - public IteratorScope getIteratorScope() { - return IteratorScope.minc; - } - }); - - super.mincReason = mincReason; - } - - private boolean isTableDeleting() { - try { - return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString()) == TableState.DELETING; - } catch (Exception e) { - log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ", e); - return false; // can not get positive confirmation that its deleting. - } - } - - @Override - public CompactionStats call() { - log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent()); - - // output to new MapFile with a temporary name - int sleepTime = 100; - double growthFactor = 4; - int maxSleepTime = 1000 * 60 * 3; // 3 minutes - boolean reportedProblem = false; - - runningCompactions.add(this); - try { - do { - try { - CompactionStats ret = super.call(); - - // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted, - // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes())); - - if (reportedProblem) { - ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile()); - } - - return ret; - } catch (IOException e) { - log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ..."); - ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); - reportedProblem = true; - } catch (RuntimeException e) { - // if this is coming from a user iterator, it is possible that the user could change the iterator config and that the - // minor compaction would succeed - log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...", e); - ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); - reportedProblem = true; - } catch (CompactionCanceledException e) { - throw new IllegalStateException(e); - } - - Random random = new Random(); - - int sleep = sleepTime + random.nextInt(sleepTime); - log.debug("MinC failed sleeping " + sleep + " ms before retrying"); - UtilWaitThread.sleep(sleep); - sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor)); - - // clean up - try { - if (getFileSystem().exists(new Path(getOutputFile()))) { - getFileSystem().deleteRecursively(new Path(getOutputFile())); - } - } catch (IOException e) { - log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage()); - } - - if (isTableDeleting()) - return new CompactionStats(0, 0); - - } while (true); - } finally { - thread = null; - runningCompactions.remove(this); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java deleted file mode 100644 index b0ed9ee..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver; - -public class Rate { - private long lastCounter = -1; - private long lastTime = -1; - private double current = 0.0; - final double ratio; - - /** - * Turn a counter into an exponentially smoothed rate over time. - * - * @param ratio - * the rate at which each update influences the curve; must be (0., 1.0) - */ - public Rate(double ratio) { - if (ratio <= 0. || ratio >= 1.0) - throw new IllegalArgumentException("ratio must be > 0. and < 1.0"); - this.ratio = ratio; - } - - public double update(long counter) { - return update(System.currentTimeMillis(), counter); - } - - synchronized public double update(long when, long counter) { - if (lastCounter < 0) { - lastTime = when; - lastCounter = counter; - return current; - } - if (lastTime == when) { - throw new IllegalArgumentException("update time < last value"); - } - double keep = 1. - ratio; - current = (keep * current + ratio * ((counter - lastCounter)) * 1000. / (when - lastTime)); - lastTime = when; - lastCounter = counter; - return current; - } - - synchronized public double rate() { - return this.current; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/RootFiles.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/RootFiles.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/RootFiles.java deleted file mode 100644 index f23c55d..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/RootFiles.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Set; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; - -/** - * - */ -public class RootFiles { - - private static Logger log = Logger.getLogger(RootFiles.class); - - static void prepareReplacement(VolumeManager fs, Path location, Set<FileRef> oldDatafiles, String compactName) throws IOException { - for (FileRef ref : oldDatafiles) { - Path path = ref.path(); - Tablet.rename(fs, path, new Path(location + "/delete+" + compactName + "+" + path.getName())); - } - } - - static void renameReplacement(VolumeManager fs, FileRef tmpDatafile, FileRef newDatafile) throws IOException { - if (fs.exists(newDatafile.path())) { - log.error("Target map file already exist " + newDatafile, new Exception()); - throw new IllegalStateException("Target map file already exist " + newDatafile); - } - - Tablet.rename(fs, tmpDatafile.path(), newDatafile.path()); - } - - static void finishReplacement(AccumuloConfiguration acuTableConf, VolumeManager fs, Path location, Set<FileRef> oldDatafiles, String compactName) - throws IOException { - // start deleting files, if we do not finish they will be cleaned - // up later - for (FileRef ref : oldDatafiles) { - Path path = ref.path(); - Path deleteFile = new Path(location + "/delete+" + compactName + "+" + path.getName()); - if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) || !fs.moveToTrash(deleteFile)) - fs.deleteRecursively(deleteFile); - } - } - - public static void replaceFiles(AccumuloConfiguration acuTableConf, VolumeManager fs, Path location, Set<FileRef> oldDatafiles, FileRef tmpDatafile, - FileRef newDatafile) throws IOException { - String compactName = newDatafile.path().getName(); - - prepareReplacement(fs, location, oldDatafiles, compactName); - renameReplacement(fs, tmpDatafile, newDatafile); - finishReplacement(acuTableConf, fs, location, oldDatafiles, compactName); - } - - public static Collection<String> cleanupReplacement(VolumeManager fs, FileStatus[] files, boolean deleteTmp) throws IOException { - /* - * called in constructor and before major compactions - */ - Collection<String> goodFiles = new ArrayList<String>(files.length); - - for (FileStatus file : files) { - - String path = file.getPath().toString(); - if (file.getPath().toUri().getScheme() == null) { - // depending on the behavior of HDFS, if list status does not return fully qualified volumes then could switch to the default volume - throw new IllegalArgumentException("Require fully qualified paths " + file.getPath()); - } - - String filename = file.getPath().getName(); - - // check for incomplete major compaction, this should only occur - // for root tablet - if (filename.startsWith("delete+")) { - String expectedCompactedFile = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1]; - if (fs.exists(new Path(expectedCompactedFile))) { - // compaction finished, but did not finish deleting compacted files.. so delete it - if (!fs.deleteRecursively(file.getPath())) - log.warn("Delete of file: " + file.getPath().toString() + " return false"); - continue; - } - // compaction did not finish, so put files back - - // reset path and filename for rest of loop - filename = filename.split("\\+", 3)[2]; - path = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename; - - Tablet.rename(fs, file.getPath(), new Path(path)); - } - - if (filename.endsWith("_tmp")) { - if (deleteTmp) { - log.warn("cleaning up old tmp file: " + path); - if (!fs.deleteRecursively(file.getPath())) - log.warn("Delete of tmp file: " + file.getPath().toString() + " return false"); - - } - continue; - } - - if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) { - log.error("unknown file in tablet" + path); - continue; - } - - goodFiles.add(path); - } - - return goodFiles; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java new file mode 100644 index 0000000..83fc43e --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import java.util.List; + +import org.apache.accumulo.core.constraints.Violations; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.tserver.tablet.CommitSession; + +public class TConstraintViolationException extends Exception { + private static final long serialVersionUID = 1L; + private final Violations violations; + private final List<Mutation> violators; + private final List<Mutation> nonViolators; + private final CommitSession commitSession; + + public TConstraintViolationException(Violations violations, List<Mutation> violators, List<Mutation> nonViolators, CommitSession commitSession) { + this.violations = violations; + this.violators = violators; + this.nonViolators = nonViolators; + this.commitSession = commitSession; + } + + Violations getViolations() { + return violations; + } + + List<Mutation> getViolators() { + return violators; + } + + List<Mutation> getNonViolators() { + return nonViolators; + } + + CommitSession getCommitSession() { + return commitSession; + } +} \ No newline at end of file