This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new bfe173e Use Ample for tablet location and tserver suspension (#1653) bfe173e is described below commit bfe173ec140f07086b0ff8940fc08d640b9539a7 Author: Jeffrey Manno <jeffreymann...@gmail.com> AuthorDate: Wed Sep 16 14:38:20 2020 -0400 Use Ample for tablet location and tserver suspension (#1653) Use Ample for tablet location updates and for placing / clearing tserver suspension information --- .../accumulo/core/metadata/schema/Ample.java | 4 + .../server/master/state/MetaDataStateStore.java | 104 ++++++--------------- .../server/master/state/SuspendingTServer.java | 15 +-- .../server/master/state/TServerInstance.java | 36 ------- .../server/metadata/TabletMutatorBase.java | 19 ++++ .../test/MasterRepairsDualAssignmentIT.java | 24 +++-- .../accumulo/test/functional/SplitRecoveryIT.java | 12 +-- 7 files changed, 68 insertions(+), 146 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index a7039d4..bd3eb09 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -222,6 +222,10 @@ public interface Ample { public TabletMutator putChopped(); + public TabletMutator putSuspension(TServer tserver, long suspensionTime); + + public TabletMutator deleteSuspension(); + /** * This method persist (or queues for persisting) previous put and deletes against this object. * Unless this method is called, previous calls will never be persisted. The purpose of this diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java index 6061f56..99f8a61 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java @@ -21,31 +21,27 @@ package org.apache.accumulo.server.master.state; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.hadoop.fs.Path; class MetaDataStateStore implements TabletStateStore { - private static final int THREADS = 4; - private static final int LATENCY = 1000; - private static final int MAX_MEMORY = 200 * 1024 * 1024; - protected final ClientContext context; protected final CurrentState state; private final String targetTableName; + private final Ample ample; protected MetaDataStateStore(ClientContext context, CurrentState state, String targetTableName) { this.context = context; this.state = state; + this.ample = context.getAmple(); this.targetTableName = targetTableName; } @@ -58,57 +54,28 @@ class MetaDataStateStore implements TabletStateStore { return new MetaDataTableScanner(context, TabletsSection.getRange(), state, targetTableName); } - @Override public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException { - BatchWriter writer = createBatchWriter(); - try { + try (var tabletsMutator = ample.mutateTablets()) { for (Assignment assignment : assignments) { - Mutation m = new Mutation(assignment.tablet.toMetaRow()); - assignment.server.putLocation(m); - assignment.server.clearFutureLocation(m); - SuspendingTServer.clearSuspension(m); - writer.addMutation(m); + tabletsMutator.mutateTablet(assignment.tablet) + .putLocation(assignment.server, LocationType.CURRENT) + .deleteLocation(assignment.server, LocationType.FUTURE).deleteSuspension().mutate(); } - } catch (Exception ex) { + } catch (RuntimeException ex) { throw new DistributedStoreException(ex); - } finally { - try { - writer.close(); - } catch (MutationsRejectedException e) { - throw new DistributedStoreException(e); - } - } - } - - BatchWriter createBatchWriter() { - try { - return context.createBatchWriter(targetTableName, - new BatchWriterConfig().setMaxMemory(MAX_MEMORY) - .setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS)); - } catch (Exception e) { - throw new RuntimeException(e); } } @Override public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException { - BatchWriter writer = createBatchWriter(); - try { + try (var tabletsMutator = ample.mutateTablets()) { for (Assignment assignment : assignments) { - Mutation m = new Mutation(assignment.tablet.toMetaRow()); - SuspendingTServer.clearSuspension(m); - assignment.server.putFutureLocation(m); - writer.addMutation(m); + tabletsMutator.mutateTablet(assignment.tablet).deleteSuspension() + .putLocation(assignment.server, LocationType.FUTURE).mutate(); } - } catch (Exception ex) { + } catch (RuntimeException ex) { throw new DistributedStoreException(ex); - } finally { - try { - writer.close(); - } catch (MutationsRejectedException e) { - throw new DistributedStoreException(e); - } } } @@ -128,67 +95,49 @@ class MetaDataStateStore implements TabletStateStore { private void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) throws DistributedStoreException { - BatchWriter writer = createBatchWriter(); - try { + try (var tabletsMutator = ample.mutateTablets()) { for (TabletLocationState tls : tablets) { - Mutation m = new Mutation(tls.extent.toMetaRow()); + TabletMutator tabletMutator = tabletsMutator.mutateTablet(tls.extent); if (tls.current != null) { - tls.current.clearLocation(m); + tabletMutator.deleteLocation(tls.current, LocationType.CURRENT); if (logsForDeadServers != null) { List<Path> logs = logsForDeadServers.get(tls.current); if (logs != null) { for (Path log : logs) { LogEntry entry = new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString()); - m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue()); + tabletMutator.putWal(entry); } } } if (suspensionTimestamp >= 0) { - SuspendingTServer suspender = - new SuspendingTServer(tls.current.getLocation(), suspensionTimestamp); - suspender.setSuspension(m); + tabletMutator.putSuspension(tls.current, suspensionTimestamp); } } if (tls.suspend != null && suspensionTimestamp < 0) { - SuspendingTServer.clearSuspension(m); + tabletMutator.deleteSuspension(); } if (tls.future != null) { - tls.future.clearFutureLocation(m); + tabletMutator.deleteLocation(tls.future, LocationType.FUTURE); } - writer.addMutation(m); + tabletMutator.mutate(); } - } catch (Exception ex) { + } catch (RuntimeException ex) { throw new DistributedStoreException(ex); - } finally { - try { - writer.close(); - } catch (MutationsRejectedException e) { - throw new DistributedStoreException(e); - } } } @Override public void unsuspend(Collection<TabletLocationState> tablets) throws DistributedStoreException { - BatchWriter writer = createBatchWriter(); - try { + try (var tabletsMutator = ample.mutateTablets()) { for (TabletLocationState tls : tablets) { if (tls.suspend != null) { continue; } - Mutation m = new Mutation(tls.extent.toMetaRow()); - SuspendingTServer.clearSuspension(m); - writer.addMutation(m); + tabletsMutator.mutateTablet(tls.extent).deleteSuspension().mutate(); } - } catch (Exception ex) { + } catch (RuntimeException ex) { throw new DistributedStoreException(ex); - } finally { - try { - writer.close(); - } catch (MutationsRejectedException e) { - throw new DistributedStoreException(e); - } } } @@ -196,5 +145,4 @@ class MetaDataStateStore implements TabletStateStore { public String name() { return "Normal Tablets"; } - } diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java index c6bb343..75bbe7a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java @@ -18,11 +18,8 @@ */ package org.apache.accumulo.server.master.state; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN; - import java.util.Objects; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.HostAndPort; @@ -44,8 +41,8 @@ public class SuspendingTServer { return new SuspendingTServer(HostAndPort.fromString(parts[0]), Long.parseLong(parts[1])); } - public Value toValue() { - return new Value(server + "|" + suspensionTime); + public static Value toValue(HostAndPort tServer, long suspensionTime) { + return new Value(tServer + "|" + suspensionTime); } @Override @@ -57,14 +54,6 @@ public class SuspendingTServer { return server.equals(rhs.server) && suspensionTime == rhs.suspensionTime; } - public void setSuspension(Mutation m) { - m.put(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier(), toValue()); - } - - public static void clearSuspension(Mutation m) { - m.putDelete(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier()); - } - @Override public int hashCode() { return Objects.hash(server, suspensionTime); diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java index a3cd4ab..d5e40c5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java @@ -25,12 +25,8 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.HostAndPort; @@ -83,30 +79,6 @@ public class TServerInstance implements Ample.TServer, Comparable<TServerInstanc this(location.getHostAndPort(), location.getSession()); } - public void putLocation(Mutation m) { - m.put(CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue()); - } - - public void putFutureLocation(Mutation m) { - m.put(FutureLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue()); - } - - public void putLastLocation(Mutation m) { - m.put(LastLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue()); - } - - public void clearLastLocation(Mutation m) { - m.putDelete(LastLocationColumnFamily.NAME, asColumnQualifier()); - } - - public void clearFutureLocation(Mutation m) { - m.putDelete(FutureLocationColumnFamily.NAME, asColumnQualifier()); - } - - public void clearLocation(Mutation m) { - m.putDelete(CurrentLocationColumnFamily.NAME, asColumnQualifier()); - } - @Override public int compareTo(TServerInstance other) { if (this == other) @@ -140,14 +112,6 @@ public class TServerInstance implements Ample.TServer, Comparable<TServerInstanc return getLocation().toString(); } - private Text asColumnQualifier() { - return new Text(this.getSession()); - } - - private Value asMutationValue() { - return new Value(getLocation().toString()); - } - @Override public HostAndPort getLocation() { return location; diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java index 3bb3ebb..e518cec 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.La import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; @@ -41,6 +42,7 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.fate.FateTxId; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.master.state.SuspendingTServer; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; @@ -202,6 +204,23 @@ public abstract class TabletMutatorBase implements Ample.TabletMutator { return this; } + @Override + public Ample.TabletMutator putSuspension(Ample.TServer tServer, long suspensionTime) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + mutation.put(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(), + SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier(), + SuspendingTServer.toValue(tServer.getLocation(), suspensionTime)); + return this; + } + + @Override + public Ample.TabletMutator deleteSuspension() { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + mutation.putDelete(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(), + SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier()); + return this; + } + protected Mutation getMutation() { updatesEnabled = false; return mutation; diff --git a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java index 473295a..26b7a97 100644 --- a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java @@ -28,20 +28,20 @@ import java.util.TreeSet; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.master.state.ClosableIterator; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.state.TabletLocationState; @@ -74,6 +74,7 @@ public class MasterRepairsDualAssignmentIT extends ConfigurableMacBase { // make some tablets, spread 'em around try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { ClientContext context = (ClientContext) c; + ServerContext serverContext = cluster.getServerContext(); String table = this.getUniqueNames(1)[0]; c.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE); @@ -134,19 +135,16 @@ public class MasterRepairsDualAssignmentIT extends ConfigurableMacBase { } assertNotEquals(null, moved); // throw a mutation in as if we were the dying tablet - BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - Mutation assignment = new Mutation(moved.extent.toMetaRow()); - moved.current.putLocation(assignment); - bw.addMutation(assignment); - bw.close(); + TabletMutator tabletMutator = serverContext.getAmple().mutateTablet(moved.extent); + tabletMutator.putLocation(moved.current, LocationType.CURRENT); + tabletMutator.mutate(); // wait for the master to fix the problem waitForCleanStore(store); // now jam up the metadata table - bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - assignment = new Mutation(new KeyExtent(MetadataTable.ID, null, null).toMetaRow()); - moved.current.putLocation(assignment); - bw.addMutation(assignment); - bw.close(); + tabletMutator = + serverContext.getAmple().mutateTablet(new KeyExtent(MetadataTable.ID, null, null)); + tabletMutator.putLocation(moved.current, LocationType.CURRENT); + tabletMutator.mutate(); waitForCleanStore(TabletStateStore.getStoreForLevel(DataLevel.METADATA, context)); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java index 1b81d68..9bb109f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java @@ -37,10 +37,8 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.clientImpl.ScannerImpl; -import org.apache.accumulo.core.clientImpl.Writer; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -48,6 +46,7 @@ import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; @@ -58,6 +57,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Se import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.fate.zookeeper.ZooLock; @@ -206,11 +206,11 @@ public class SplitRecoveryIT extends ConfigurableMacBase { MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio, context, zl); TServerInstance instance = new TServerInstance(location, zl.getSessionId()); - Writer writer = MetadataTableUtil.getMetadataTable(context); Assignment assignment = new Assignment(high, instance); - Mutation m = new Mutation(assignment.tablet.toMetaRow()); - assignment.server.putFutureLocation(m); - writer.update(m); + + TabletMutator tabletMutator = context.getAmple().mutateTablet(extent); + tabletMutator.putLocation(assignment.server, LocationType.FUTURE); + tabletMutator.mutate(); if (steps >= 1) { Map<Long,List<TabletFile>> bulkFiles = getBulkFilesLoaded(context, extent);