http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java new file mode 100644 index 0000000..827b803 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +public class TabletClosedException extends RuntimeException { + public TabletClosedException(Exception e) { + super(e); + } + + public TabletClosedException() { + super(); + } + + private static final long serialVersionUID = 1L; +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java new file mode 100644 index 0000000..bd87a5b --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.util.Collection; +import java.util.List; + +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.tserver.InMemoryMap; +import org.apache.accumulo.tserver.log.DfsLogger; + +public interface TabletCommitter { + + void abortCommit(CommitSession commitSession, List<Mutation> value); + + void commit(CommitSession commitSession, List<Mutation> mutations); + + boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish); + + void finishUpdatingLogsUsed(); + + TableConfiguration getTableConfiguration(); + + KeyExtent getExtent(); + + int getLogId(); + + boolean getUseWAL(); + + void updateMemoryUsageStats(long estimatedSizeInBytes, long estimatedSizeInBytes2); + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java new file mode 100644 index 0000000..155d6b5 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; +import org.apache.accumulo.tserver.InMemoryMap; +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; +import org.apache.log4j.Logger; + +class TabletMemory implements Closeable { + static private final Logger log = Logger.getLogger(TabletMemory.class); + + private final TabletCommitter tablet; + private InMemoryMap memTable; + private InMemoryMap otherMemTable; + private InMemoryMap deletingMemTable; + private int nextSeq = 1; + private CommitSession commitSession; + + TabletMemory(TabletCommitter tablet) { + this.tablet = tablet; + try { + memTable = new InMemoryMap(tablet.getTableConfiguration()); + } catch (LocalityGroupConfigurationError e) { + throw new RuntimeException(e); + } + commitSession = new CommitSession(tablet, nextSeq, memTable); + nextSeq += 2; + } + + public InMemoryMap getMemTable() { + return memTable; + } + + public InMemoryMap getMinCMemTable() { + return otherMemTable; + } + + public CommitSession prepareForMinC() { + if (otherMemTable != null) { + throw new IllegalStateException(); + } + + if (deletingMemTable != null) { + throw new IllegalStateException(); + } + if (commitSession == null) { + throw new IllegalStateException(); + } + + otherMemTable = memTable; + try { + memTable = new InMemoryMap(tablet.getTableConfiguration()); + } catch (LocalityGroupConfigurationError e) { + throw new RuntimeException(e); + } + + CommitSession oldCommitSession = commitSession; + commitSession = new CommitSession(tablet, nextSeq, memTable); + nextSeq += 2; + + tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes()); + + return oldCommitSession; + } + + public void finishedMinC() { + + if (otherMemTable == null) { + throw new IllegalStateException(); + } + + if (deletingMemTable != null) { + throw new IllegalStateException(); + } + + if (commitSession == null) { + throw new IllegalStateException(); + } + + deletingMemTable = otherMemTable; + + otherMemTable = null; + tablet.notifyAll(); + } + + public void finalizeMinC() { + if (commitSession == null) { + throw new IllegalStateException(); + } + try { + deletingMemTable.delete(15000); + } finally { + synchronized (tablet) { + if (otherMemTable != null) { + throw new IllegalStateException(); + } + + if (deletingMemTable == null) { + throw new IllegalStateException(); + } + + deletingMemTable = null; + + tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0); + } + } + } + + public boolean memoryReservedForMinC() { + return otherMemTable != null || deletingMemTable != null; + } + + public void waitForMinC() { + while (otherMemTable != null || deletingMemTable != null) { + try { + tablet.wait(50); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + } + + public void mutate(CommitSession cm, List<Mutation> mutations) { + cm.mutate(mutations); + } + + public void updateMemoryUsageStats() { + long other = 0; + if (otherMemTable != null) + other = otherMemTable.estimatedSizeInBytes(); + else if (deletingMemTable != null) + other = deletingMemTable.estimatedSizeInBytes(); + + tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other); + } + + public List<MemoryIterator> getIterators() { + List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2); + toReturn.add(memTable.skvIterator()); + if (otherMemTable != null) + toReturn.add(otherMemTable.skvIterator()); + return toReturn; + } + + public void returnIterators(List<MemoryIterator> iters) { + for (MemoryIterator iter : iters) { + iter.close(); + } + } + + public long getNumEntries() { + if (otherMemTable != null) + return memTable.getNumEntries() + otherMemTable.getNumEntries(); + return memTable.getNumEntries(); + } + + public CommitSession getCommitSession() { + return commitSession; + } + + @Override + public void close() throws IOException { + commitSession = null; + } + + public boolean isClosed() { + return commitSession == null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java index c5c3316..253c97e 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java @@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.SortedMapIterator; -import org.apache.accumulo.tserver.Compactor.CountingIterator; +import org.apache.accumulo.tserver.tablet.Compactor.CountingIterator; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java index f216e93..7cfe65c 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.tserver.tablet.RootFiles; import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Rule;