ACCUMULO-3339 extract tablet data needed by a tablet to its own class
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1e2a84f8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1e2a84f8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1e2a84f8 Branch: refs/heads/master Commit: 1e2a84f836f3c7f09d221b1e7d819d0fce1bd8f7 Parents: eb4c38f Author: Eric C. Newton <eric.new...@gmail.com> Authored: Mon Jul 6 14:03:15 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Mon Jul 6 14:03:15 2015 -0400 ---------------------------------------------------------------------- .../apache/accumulo/server/fs/VolumeUtil.java | 7 +- .../apache/accumulo/tserver/TabletServer.java | 23 ++- .../accumulo/tserver/tablet/SplitInfo.java | 85 -------- .../apache/accumulo/tserver/tablet/Tablet.java | 204 ++----------------- .../accumulo/tserver/tablet/TabletData.java | 203 ++++++++++++++++++ .../apache/accumulo/test/BalanceFasterIT.java | 2 - 6 files changed, 241 insertions(+), 283 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index 4722e60..7cd0d9e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@ -46,7 +46,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -165,13 +164,13 @@ public class VolumeUtil { } } - public static Text switchRootTabletVolume(KeyExtent extent, Text location) throws IOException { + public static String switchRootTabletVolume(KeyExtent extent, String location) throws IOException { if (extent.isRootTablet()) { - String newLocation = switchVolume(location.toString(), FileType.TABLE, ServerConstants.getVolumeReplacements()); + String newLocation = switchVolume(location, FileType.TABLE, ServerConstants.getVolumeReplacements()); if (newLocation != null) { MetadataTableUtil.setRootTabletDir(newLocation); log.info("Volume replaced " + extent + " : " + location + " -> " + newLocation); - return new Text(new Path(newLocation).toString()); + return new Path(newLocation).toString(); } } return location; http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index dc382f2..a8be243 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -233,9 +233,9 @@ import org.apache.accumulo.tserver.tablet.Compactor; import org.apache.accumulo.tserver.tablet.KVEntry; import org.apache.accumulo.tserver.tablet.ScanBatch; import org.apache.accumulo.tserver.tablet.Scanner; -import org.apache.accumulo.tserver.tablet.SplitInfo; import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.accumulo.tserver.tablet.TabletClosedException; +import org.apache.accumulo.tserver.tablet.TabletData; import org.apache.commons.collections.map.LRUMap; import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileSystem; @@ -1876,7 +1876,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { private void splitTablet(Tablet tablet) { try { - TreeMap<KeyExtent,SplitInfo> tabletInfo = splitTablet(tablet, null); + TreeMap<KeyExtent,TabletData> tabletInfo = splitTablet(tablet, null); if (tabletInfo == null) { // either split or compact not both // were not able to split... so see if a major compaction is @@ -1892,10 +1892,10 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } - private TreeMap<KeyExtent,SplitInfo> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException { + private TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException { long t1 = System.currentTimeMillis(); - TreeMap<KeyExtent,SplitInfo> tabletInfo = tablet.split(splitPoint); + TreeMap<KeyExtent,TabletData> tabletInfo = tablet.split(splitPoint); if (tabletInfo == null) { return null; } @@ -1906,11 +1906,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable { Tablet[] newTablets = new Tablet[2]; - Entry<KeyExtent,SplitInfo> first = tabletInfo.firstEntry(); + Entry<KeyExtent,TabletData> first = tabletInfo.firstEntry(); TabletResourceManager newTrm0 = resourceManager.createTabletResourceManager(first.getKey(), getTableConfiguration(first.getKey())); newTablets[0] = new Tablet(TabletServer.this, first.getKey(), newTrm0, first.getValue()); - Entry<KeyExtent,SplitInfo> last = tabletInfo.lastEntry(); + Entry<KeyExtent,TabletData> last = tabletInfo.lastEntry(); TabletResourceManager newTrm1 = resourceManager.createTabletResourceManager(last.getKey(), getTableConfiguration(last.getKey())); newTablets[1] = new Tablet(TabletServer.this, last.getKey(), newTrm1, last.getValue()); @@ -2129,11 +2129,16 @@ public class TabletServer extends AccumuloServerContext implements Runnable { acquireRecoveryMemory(extent); TabletResourceManager trm = resourceManager.createTabletResourceManager(extent, getTableConfiguration(extent)); - + TabletData data; + if (extent.isRootTablet()) { + data = new TabletData(fs, ZooReaderWriter.getInstance()); + } else { + data = new TabletData(extent, fs, tabletsKeyValues.entrySet().iterator()); + } // this opens the tablet file and fills in the endKey in the extent - locationToOpen = VolumeUtil.switchRootTabletVolume(extent, locationToOpen); + data.setDirectory(VolumeUtil.switchRootTabletVolume(extent, data.getDirectory())); - tablet = new Tablet(TabletServer.this, extent, locationToOpen, trm, tabletsKeyValues); + tablet = new Tablet(TabletServer.this, extent, trm, data); // If a minor compaction starts after a tablet opens, this indicates a log recovery occurred. This recovered data must be minor compacted. // There are three reasons to wait for this minor compaction to finish before placing the tablet in online tablets. // http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java deleted file mode 100644 index 64b6a11..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver.tablet; - -import java.util.Collection; -import java.util.Map; -import java.util.SortedMap; - -import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.master.state.TServerInstance; - -/** - * operations are disallowed while we split which is ok since splitting is fast - * - * a minor compaction should have taken place before calling this so there should be relatively little left to compact - * - * we just need to make sure major compactions aren't occurring if we have the major compactor thread decide who needs splitting we can avoid synchronization - * issues with major compactions - * - */ - -final public class SplitInfo { - private final String dir; - private final SortedMap<FileRef,DataFileValue> datafiles; - private final String time; - private final long initFlushID; - private final long initCompactID; - private final TServerInstance lastLocation; - private final Map<Long, ? extends Collection<FileRef>> bulkImported; - - SplitInfo(String d, SortedMap<FileRef,DataFileValue> dfv, String time, long initFlushID, long initCompactID, TServerInstance lastLocation, - Map<Long, ? extends Collection<FileRef>> bulkImported) { - this.dir = d; - this.datafiles = dfv; - this.time = time; - this.initFlushID = initFlushID; - this.initCompactID = initCompactID; - this.lastLocation = lastLocation; - this.bulkImported = bulkImported; - } - - public String getDir() { - return dir; - } - - public SortedMap<FileRef,DataFileValue> getDatafiles() { - return datafiles; - } - - public String getTime() { - return time; - } - - public long getInitFlushID() { - return initFlushID; - } - - public long getInitCompactID() { - return initCompactID; - } - - public TServerInstance getLastLocation() { - return lastLocation; - } - - public Map<Long, ? extends Collection<FileRef>> getBulkImported() { - return bulkImported; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index c0fb918..307044f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -17,8 +17,6 @@ package org.apache.accumulo.tserver.tablet; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -79,11 +77,6 @@ import org.apache.accumulo.core.master.thrift.TabletLoadState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; import org.apache.accumulo.core.security.Authorizations; @@ -96,7 +89,6 @@ import org.apache.accumulo.core.trace.Trace; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.FileRef; @@ -166,7 +158,6 @@ import com.google.common.cache.CacheBuilder; */ public class Tablet implements TabletCommitter { static private final Logger log = Logger.getLogger(Tablet.class); - static private final List<LogEntry> NO_LOG_ENTRIES = Collections.emptyList(); private final TabletServer tabletServer; private final KeyExtent extent; @@ -313,167 +304,10 @@ public class Tablet implements TabletCommitter { this.tableConfiguration = tableConfiguration; this.extent = extent; this.configObserver = configObserver; + this.splitCreationTime = 0; } - public Tablet(TabletServer tabletServer, KeyExtent extent, TabletResourceManager trm, SplitInfo info) throws IOException { - this(tabletServer, new Text(info.getDir()), extent, trm, info.getDatafiles(), info.getTime(), info.getInitFlushID(), info.getInitCompactID(), info - .getLastLocation(), info.getBulkImported()); - splitCreationTime = System.currentTimeMillis(); - } - - private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap<FileRef,DataFileValue> datafiles, - String time, long initFlushID, long initCompactID, TServerInstance lastLocation, Map<Long, ? extends Collection<FileRef>> bulkImported) throws IOException { - this(tabletServer, extent, location, trm, NO_LOG_ENTRIES, datafiles, time, lastLocation, new HashSet<FileRef>(), initFlushID, initCompactID, bulkImported); - } - - private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) { - SortedMap<Key,Value> entries; - - if (extent.isRootTablet()) { - return null; - } else { - entries = new TreeMap<Key,Value>(); - Text rowName = extent.getMetadataEntry(); - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - if (entry.getKey().compareRow(rowName) == 0 && TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) { - entries.put(new Key(entry.getKey()), new Value(entry.getValue())); - } - } - } - - if (entries.size() == 1) - return entries.values().iterator().next().toString(); - return null; - } - - private static SortedMap<FileRef,DataFileValue> lookupDatafiles(AccumuloServerContext context, VolumeManager fs, KeyExtent extent, - SortedMap<Key,Value> tabletsKeyValues) throws IOException { - - TreeMap<FileRef,DataFileValue> result = new TreeMap<FileRef,DataFileValue>(); - - if (extent.isRootTablet()) { // the meta0 tablet - Path location = new Path(MetadataTableUtil.getRootTabletDir()); - - // cleanUpFiles() has special handling for delete. files - FileStatus[] files = fs.listStatus(location); - Collection<String> goodPaths = RootFiles.cleanupReplacement(fs, files, true); - for (String good : goodPaths) { - Path path = new Path(good); - String filename = path.getName(); - FileRef ref = new FileRef(location.toString() + "/" + filename, path); - DataFileValue dfv = new DataFileValue(0, 0); - result.put(ref, dfv); - } - } else { - final Text buffer = new Text(); - - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - Key k = entry.getKey(); - k.getColumnFamily(buffer); - // Ignore anything but file: - if (TabletsSection.DataFileColumnFamily.NAME.equals(buffer)) { - FileRef ref = new FileRef(fs, k); - result.put(ref, new DataFileValue(entry.getValue().get())); - } - } - } - return result; - } - - private static List<LogEntry> lookupLogEntries(SortedMap<Key,Value> tabletsKeyValues, AccumuloServerContext context, KeyExtent ke) { - List<LogEntry> result = new ArrayList<LogEntry>(); - - if (ke.isRootTablet()) { - try { - result = MetadataTableUtil.getLogEntries(context, ke); - } catch (Exception ex) { - throw new RuntimeException("Unable to read tablet log entries", ex); - } - } else { - log.debug("Looking at metadata " + tabletsKeyValues); - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - Key key = entry.getKey(); - if (key.getColumnFamily().equals(LogColumnFamily.NAME)) { - result.add(LogEntry.fromKeyValue(key, entry.getValue())); - } - } - } - - log.debug("got " + result + " for logs for " + ke); - return result; - } - - private static Set<FileRef> lookupScanFiles(SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) throws IOException { - HashSet<FileRef> result = new HashSet<FileRef>(); - - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - Key key = entry.getKey(); - if (key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) { - result.add(new FileRef(fs, key)); - } - } - - return result; - } - - private static long lookupFlushID(SortedMap<Key,Value> tabletsKeyValues) { - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - Key key = entry.getKey(); - if (FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) - return Long.parseLong(entry.getValue().toString()); - } - - return -1; - } - - private static long lookupCompactID(SortedMap<Key,Value> tabletsKeyValues) { - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - Key key = entry.getKey(); - if (COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) - return Long.parseLong(entry.getValue().toString()); - } - - return -1; - } - - private static TServerInstance lookupLastServer(SortedMap<Key,Value> tabletsKeyValues) { - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - if (entry.getKey().getColumnFamily().compareTo(LastLocationColumnFamily.NAME) == 0) { - return new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()); - } - } - return null; - } - - private static Map<Long, List<FileRef>> lookupBulkImported(SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) { - Map<Long,List<FileRef>> result = new HashMap<>(); - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - if (entry.getKey().getColumnFamily().compareTo(BulkFileColumnFamily.NAME) == 0) { - Long id = Long.decode(entry.getValue().toString()); - List<FileRef> lst = result.get(id); - if (lst == null) { - lst = new ArrayList<FileRef>(); - } - lst.add(new FileRef(fs, entry.getKey())); - } - } - return result; - } - - public Tablet(TabletServer tabletServer, KeyExtent extent, Text location, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues) - throws IOException { - this(tabletServer, extent, location, trm, lookupLogEntries(tabletsKeyValues, tabletServer, extent), lookupDatafiles(tabletServer, - tabletServer.getFileSystem(), extent, tabletsKeyValues), lookupTime(tabletServer.getConfiguration(), extent, tabletsKeyValues), - lookupLastServer(tabletsKeyValues), lookupScanFiles(tabletsKeyValues, tabletServer.getFileSystem()), lookupFlushID(tabletsKeyValues), - lookupCompactID(tabletsKeyValues), lookupBulkImported(tabletsKeyValues, tabletServer.getFileSystem())); - } - - /** - * yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time - */ - private Tablet(final TabletServer tabletServer, final KeyExtent extent, final Text location, final TabletResourceManager trm, - final List<LogEntry> rawLogEntries, final SortedMap<FileRef,DataFileValue> rawDatafiles, String time, final TServerInstance lastLocation, - final Set<FileRef> scanFiles, final long initFlushID, final long initCompactID, final Map<Long, ? extends Collection<FileRef>> bulkImported) throws IOException { + public Tablet(final TabletServer tabletServer, final KeyExtent extent, final TabletResourceManager trm, TabletData data) throws IOException { TableConfiguration tblConf = tabletServer.getTableConfiguration(extent); if (null == tblConf) { @@ -484,8 +318,10 @@ public class Tablet implements TabletCommitter { this.tableConfiguration = tblConf; - TabletFiles tabletPaths = VolumeUtil.updateTabletVolumes(tabletServer, tabletServer.getLock(), tabletServer.getFileSystem(), extent, new TabletFiles( - location.toString(), rawLogEntries, rawDatafiles), ReplicationConfigurationUtil.isEnabled(extent, this.tableConfiguration)); + TabletFiles tabletPaths = VolumeUtil + .updateTabletVolumes(tabletServer, tabletServer.getLock(), tabletServer.getFileSystem(), extent, + new TabletFiles(data.getDirectory(), data.getLogEntris(), data.getDataFiles()), + ReplicationConfigurationUtil.isEnabled(extent, this.tableConfiguration)); Path locationPath; @@ -499,14 +335,16 @@ public class Tablet implements TabletCommitter { final SortedMap<FileRef,DataFileValue> datafiles = tabletPaths.datafiles; this.location = locationPath; - this.lastLocation = lastLocation; + this.lastLocation = data.getLastLocation(); this.tabletDirectory = tabletPaths.dir; this.extent = extent; this.tabletResources = trm; - this.lastFlushID = initFlushID; - this.lastCompactID = initCompactID; + this.lastFlushID = data.getFlushID(); + this.lastCompactID = data.getCompactID(); + this.splitCreationTime = data.getSplitTime(); + String time = data.getTime(); if (extent.isRootTablet()) { long rtime = Long.MIN_VALUE; @@ -590,8 +428,8 @@ public class Tablet implements TabletCommitter { // Force a load of any per-table properties configObserver.propertiesChanged(); - for (Long key : bulkImported.keySet()) { - this.bulkImported.put(key, new CopyOnWriteArrayList<FileRef>(bulkImported.get(key))); + for (Entry<Long,List<FileRef>> entry : data.getBulkImported().entrySet()) { + this.bulkImported.put(entry.getKey(), new CopyOnWriteArrayList<FileRef>(entry.getValue())); } if (!logEntries.isEmpty()) { @@ -681,7 +519,7 @@ public class Tablet implements TabletCommitter { computeNumEntries(); - getDatafileManager().removeFilesAfterScan(scanFiles); + getDatafileManager().removeFilesAfterScan(data.getScanFiles()); // look for hints of a failure on the previous tablet server if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) { @@ -1628,7 +1466,7 @@ public class Tablet implements TabletCommitter { private boolean sawBigRow = false; private long timeOfLastMinCWhenBigFreakinRowWasSeen = 0; private long timeOfLastImportWhenBigFreakinRowWasSeen = 0; - private long splitCreationTime; + private final long splitCreationTime; private SplitRowSpec findSplitRow(Collection<FileRef> files) { @@ -2218,7 +2056,7 @@ public class Tablet implements TabletCommitter { return majorCompactionQueued.size() > 0; } - public TreeMap<KeyExtent,SplitInfo> split(byte[] sp) throws IOException { + public TreeMap<KeyExtent,TabletData> split(byte[] sp) throws IOException { if (sp != null && extent.getEndRow() != null && extent.getEndRow().equals(new Text(sp))) { throw new IllegalArgumentException(); @@ -2253,7 +2091,7 @@ public class Tablet implements TabletCommitter { synchronized (this) { // java needs tuples ... - TreeMap<KeyExtent,SplitInfo> newTablets = new TreeMap<KeyExtent,SplitInfo>(); + TreeMap<KeyExtent,TabletData> newTablets = new TreeMap<KeyExtent,TabletData>(); long t1 = System.currentTimeMillis(); // choose a split point @@ -2297,14 +2135,14 @@ public class Tablet implements TabletCommitter { String time = tabletTime.getMetadataValue(); MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio, getTabletServer(), getTabletServer().getLock()); - MasterMetadataUtil.addNewTablet(getTabletServer(), low, lowDirectory, getTabletServer().getTabletSession(), lowDatafileSizes, getBulkIngestedFiles(), time, - lastFlushID, lastCompactID, getTabletServer().getLock()); + MasterMetadataUtil.addNewTablet(getTabletServer(), low, lowDirectory, getTabletServer().getTabletSession(), lowDatafileSizes, getBulkIngestedFiles(), + time, lastFlushID, lastCompactID, getTabletServer().getLock()); MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove, getTabletServer(), getTabletServer().getLock()); log.log(TLevel.TABLET_HIST, extent + " split " + low + " " + high); - newTablets.put(high, new SplitInfo(tabletDirectory, highDatafileSizes, time, lastFlushID, lastCompactID, lastLocation, getBulkIngestedFiles())); - newTablets.put(low, new SplitInfo(lowDirectory, lowDatafileSizes, time, lastFlushID, lastCompactID, lastLocation, getBulkIngestedFiles())); + newTablets.put(high, new TabletData(tabletDirectory, highDatafileSizes, time, lastFlushID, lastCompactID, lastLocation, getBulkIngestedFiles())); + newTablets.put(low, new TabletData(lowDirectory, lowDatafileSizes, time, lastFlushID, lastCompactID, lastLocation, getBulkIngestedFiles())); long t2 = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java new file mode 100644 index 0000000..a076284 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TabletData { + private static Logger log = LoggerFactory.getLogger(TabletData.class); + + private String time = null; + private SortedMap<FileRef,DataFileValue> dataFiles = new TreeMap<>(); + private List<LogEntry> logEntris = new ArrayList<>(); + private HashSet<FileRef> scanFiles = new HashSet<>(); + private long flushID = -1; + private long compactID = -1; + private TServerInstance lastLocation = null; + private Map<Long,List<FileRef>> bulkImported = new HashMap<>(); + private String directory = null; + private long splitTime = 0; + + // read tablet data from metadata tables + public TabletData(KeyExtent extent, VolumeManager fs, Iterator<Entry<Key,Value>> entries) { + final Text family = new Text(); + Text rowName = extent.getMetadataEntry(); + while (entries.hasNext()) { + Entry<Key,Value> entry = entries.next(); + Key key = entry.getKey(); + Value value = entry.getValue(); + key.getColumnFamily(family); + if (key.compareRow(rowName) != 0) { + log.info("Unexpected metadata table entry for {}: {}", extent, key.getRow()); + continue; + } + if (ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) { + if (time == null) { + time = value.toString(); + } + } else if (DataFileColumnFamily.NAME.equals(family)) { + FileRef ref = new FileRef(fs, key); + dataFiles.put(ref, new DataFileValue(entry.getValue().get())); + } else if (DIRECTORY_COLUMN.hasColumns(key)) { + directory = value.toString(); + } else if (family.equals(LogColumnFamily.NAME)) { + logEntris.add(LogEntry.fromKeyValue(key, entry.getValue())); + } else if (family.equals(ScanFileColumnFamily.NAME)) { + scanFiles.add(new FileRef(fs, key)); + } else if (FLUSH_COLUMN.hasColumns(key)) { + flushID = Long.parseLong(value.toString()); + } else if (COMPACT_COLUMN.hasColumns(key)) { + compactID = Long.parseLong(entry.getValue().toString()); + } else if (family.equals(LastLocationColumnFamily.NAME)) { + lastLocation = new TServerInstance(value, key.getColumnQualifier()); + } else if (family.equals(BulkFileColumnFamily.NAME)) { + Long id = Long.decode(value.toString()); + List<FileRef> lst = bulkImported.get(id); + if (lst == null) { + bulkImported.put(id, lst = new ArrayList<>()); + } + lst.add(new FileRef(fs, key)); + } else if (PREV_ROW_COLUMN.hasColumns(key)) { + KeyExtent check = new KeyExtent(key.getRow(), value); + if (!check.equals(extent)) { + throw new RuntimeException("Found bad entry for " + extent + ": " + check); + } + } + } + } + + // read basic root table metadata from zookeeper + public TabletData(VolumeManager fs, ZooReader rdr) throws IOException { + Path location = new Path(MetadataTableUtil.getRootTabletDir()); + + // cleanUpFiles() has special handling for deleting files + FileStatus[] files = fs.listStatus(location); + Collection<String> goodPaths = RootFiles.cleanupReplacement(fs, files, true); + for (String good : goodPaths) { + Path path = new Path(good); + String filename = path.getName(); + FileRef ref = new FileRef(location.toString() + "/" + filename, path); + DataFileValue dfv = new DataFileValue(0, 0); + dataFiles.put(ref, dfv); + } + try { + logEntris = MetadataTableUtil.getLogEntries(null, RootTable.EXTENT); + } catch (Exception ex) { + throw new RuntimeException("Unable to read tablet log entries", ex); + } + directory = MetadataTableUtil.getRootTabletDir(); + } + + // split + public TabletData(String tabletDirectory, SortedMap<FileRef,DataFileValue> highDatafileSizes, String time, long lastFlushID, long lastCompactID, + TServerInstance lastLocation, Map<Long,List<FileRef>> bulkIngestedFiles) { + this.directory = tabletDirectory; + this.dataFiles = highDatafileSizes; + this.time = time; + this.flushID = lastFlushID; + this.compactID = lastCompactID; + this.lastLocation = lastLocation; + this.bulkImported = bulkIngestedFiles; + this.splitTime = System.currentTimeMillis(); + } + + public static Logger getLog() { + return log; + } + + public String getTime() { + return time; + } + + public SortedMap<FileRef,DataFileValue> getDataFiles() { + return dataFiles; + } + + public List<LogEntry> getLogEntris() { + return logEntris; + } + + public HashSet<FileRef> getScanFiles() { + return scanFiles; + } + + public long getFlushID() { + return flushID; + } + + public long getCompactID() { + return compactID; + } + + public TServerInstance getLastLocation() { + return lastLocation; + } + + public Map<Long,List<FileRef>> getBulkImported() { + return bulkImported; + } + + public String getDirectory() { + return directory; + } + + public void setDirectory(String directory) { + this.directory = directory; + } + + public long getSplitTime() { + return splitTime; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e2a84f8/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java index 4418fe7..ab70224 100644 --- a/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java @@ -45,8 +45,6 @@ import org.junit.Test; // ACCUMULO-2952 public class BalanceFasterIT extends ConfigurableMacBase { - - @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { cfg.setNumTservers(3);