This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 1aa9a1b370 Modified InstanceOperations.getActiveCompactions for Compactors (#4845) 1aa9a1b370 is described below commit 1aa9a1b3704a036c5aa95562f453eb3899659c27 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Aug 29 11:46:15 2024 -0400 Modified InstanceOperations.getActiveCompactions for Compactors (#4845) Modified InstanceOperations.getActiveCompactions to handle addresses for Compactors in addition to TabletServers. Added InstanceOperations method to get Compactor addresses, it was missing. --- .../core/client/admin/InstanceOperations.java | 19 +++-- .../accumulo/core/clientImpl/ClientContext.java | 9 +++ .../core/clientImpl/InstanceOperationsImpl.java | 46 ++++++++--- .../accumulo/core/clientImpl/TabletLocator.java | 6 +- .../core/clientImpl/TabletServerBatchWriter.java | 6 +- .../core/clientImpl/ZookeeperLockChecker.java | 5 ++ .../shell/commands/ListCompactionsCommand.java | 3 +- .../test/compaction/ExternalCompaction_1_IT.java | 91 ++++++++++++++++++++++ .../accumulo/test/functional/CompactionIT.java | 82 +++++++++++++++++++ 9 files changed, 240 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java index 2e1c0c92ae..033dacbe94 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java @@ -164,6 +164,14 @@ public interface InstanceOperations { */ List<String> getManagerLocations(); + /** + * Returns the locations of the active compactors + * + * @return A set of currently active compactors. + * @since 2.1.4 + */ + Set<String> getCompactors(); + /** * Returns the locations of the active scan servers * @@ -189,13 +197,12 @@ public interface InstanceOperations { throws AccumuloException, AccumuloSecurityException; /** - * List the active compaction running on a tablet server. Using this method with - * {@link #getTabletServers()} will only show compactions running on tservers, leaving out any - * external compactions running on compactors. Use {@link #getActiveCompactions()} to get a list - * of all compactions running on tservers and compactors. + * List the active compaction running on a TabletServer or Compactor. The server address can be + * retrieved using {@link #getCompactors()} or {@link #getTabletServers()}. Use + * {@link #getActiveCompactions()} to get a list of all compactions running on tservers and + * compactors. Implementation updated in 2.1.4 to accept a compactor address. * - * @param tserver The tablet server address. This should be of the form - * {@code <ip address>:<port>} + * @param tserver The server address. This should be of the form {@code <ip address>:<port>} * @return the list of active compactions * @since 1.5.0 */ diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index b6c30891b6..03bd7b66bc 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -142,6 +142,7 @@ public class ClientContext implements AccumuloClient { private final Supplier<ScanServerSelector> scanServerSelectorSupplier; private TCredentials rpcCreds; private ThriftTransportPool thriftTransportPool; + private ZookeeperLockChecker zkLockChecker; private volatile boolean closed = false; @@ -1124,4 +1125,12 @@ public class ClientContext implements AccumuloClient { return thriftTransportPool; } + public synchronized ZookeeperLockChecker getTServerLockChecker() { + ensureOpen(); + if (this.zkLockChecker == null) { + this.zkLockChecker = new ZookeeperLockChecker(this); + } + return this.zkLockChecker; + } + } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 8c73dab8e5..ef316defd9 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -33,6 +33,7 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS import java.util.ArrayList; import java.util.Collections; import java.util.ConcurrentModificationException; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -211,6 +212,15 @@ public class InstanceOperationsImpl implements InstanceOperations { return context.getManagerLocations(); } + @Override + public Set<String> getCompactors() { + Set<String> compactors = new HashSet<>(); + ExternalCompactionUtil.getCompactorAddrs(context).values().forEach(addrs -> { + addrs.forEach(hp -> compactors.add(hp.toString())); + }); + return compactors; + } + @Override public Set<String> getScanServers() { return Set.copyOf(context.getScanServers().keySet()); @@ -271,26 +281,37 @@ public class InstanceOperationsImpl implements InstanceOperations { } @Override - public List<ActiveCompaction> getActiveCompactions(String tserver) + public List<ActiveCompaction> getActiveCompactions(String server) throws AccumuloException, AccumuloSecurityException { - final var parsedTserver = HostAndPort.fromString(tserver); - Client client = null; - try { - client = getClient(ThriftClientTypes.TABLET_SERVER, parsedTserver, context); + final var serverHostAndPort = HostAndPort.fromString(server); - List<ActiveCompaction> as = new ArrayList<>(); - for (var tac : client.getActiveCompactions(TraceUtil.traceInfo(), context.rpcCreds())) { - as.add(new ActiveCompactionImpl(context, tac, parsedTserver, CompactionHost.Type.TSERVER)); + final List<ActiveCompaction> as = new ArrayList<>(); + try { + if (context.getTServerLockChecker().doesTabletServerLockExist(server)) { + Client client = null; + try { + client = getClient(ThriftClientTypes.TABLET_SERVER, serverHostAndPort, context); + for (var tac : client.getActiveCompactions(TraceUtil.traceInfo(), context.rpcCreds())) { + as.add(new ActiveCompactionImpl(context, tac, serverHostAndPort, + CompactionHost.Type.TSERVER)); + } + } finally { + if (client != null) { + returnClient(client, context); + } + } + } else { + // if not a TabletServer address, maybe it's a Compactor + for (var tac : ExternalCompactionUtil.getActiveCompaction(serverHostAndPort, context)) { + as.add(new ActiveCompactionImpl(context, tac, serverHostAndPort, + CompactionHost.Type.COMPACTOR)); + } } return as; } catch (ThriftSecurityException e) { throw new AccumuloSecurityException(e.user, e.code, e); } catch (TException e) { throw new AccumuloException(e); - } finally { - if (client != null) { - returnClient(client, context); - } } } @@ -392,4 +413,5 @@ public class InstanceOperationsImpl implements InstanceOperations { public InstanceId getInstanceId() { return context.getInstanceID(); } + } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java index 2f8e31b8ac..c4c1dcdcd9 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java @@ -141,13 +141,13 @@ public abstract class TabletLocator { MetadataLocationObtainer mlo = new MetadataLocationObtainer(); if (RootTable.ID.equals(tableId)) { - tl = new RootTabletLocator(new ZookeeperLockChecker(context)); + tl = new RootTabletLocator(context.getTServerLockChecker()); } else if (MetadataTable.ID.equals(tableId)) { tl = new TabletLocatorImpl(MetadataTable.ID, getLocator(context, RootTable.ID), mlo, - new ZookeeperLockChecker(context)); + context.getTServerLockChecker()); } else { tl = new TabletLocatorImpl(tableId, getLocator(context, MetadataTable.ID), mlo, - new ZookeeperLockChecker(context)); + context.getTServerLockChecker()); } locators.put(key, tl); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index 2ee17ee834..46fce95cfc 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -51,7 +51,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriterConfig; @@ -73,7 +72,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.dataImpl.thrift.TMutation; import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; @@ -1088,9 +1086,7 @@ public class TabletServerBatchWriter implements AutoCloseable { * Checks if there is a lock held by a tserver at a specific host and port. */ private boolean isALockHeld(String tserver) { - var root = context.getZooKeeperRoot() + Constants.ZTSERVERS; - var zLockPath = ServiceLock.path(root + "/" + tserver); - return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0; + return context.getTServerLockChecker().doesTabletServerLockExist(tserver); } private void cancelSession() throws InterruptedException, ThriftSecurityException { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java index 23f036b8f0..8d99d36a30 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java @@ -33,6 +33,11 @@ public class ZookeeperLockChecker implements TabletServerLockChecker { this.root = context.getZooKeeperRoot() + Constants.ZTSERVERS; } + public boolean doesTabletServerLockExist(String server) { + var zLockPath = ServiceLock.path(root + "/" + server); + return ServiceLock.getSessionId(zc, zLockPath) != 0; + } + @Override public boolean isLockHeld(String tserver, String session) { var zLockPath = ServiceLock.path(root + "/" + tserver); diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ListCompactionsCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ListCompactionsCommand.java index 7dcba79f7c..7c5f32cc48 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ListCompactionsCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ListCompactionsCommand.java @@ -85,7 +85,8 @@ public class ListCompactionsCommand extends Command { filterOption = new Option("f", "filter", true, "show only compactions that match the regex"); opts.addOption(filterOption); - tserverOption = new Option("ts", "tabletServer", true, "tablet server to list compactions for"); + tserverOption = new Option("ts", "tabletServer", true, + "tablet server or compactor to list compactions for"); tserverOption.setArgName("tablet server"); opts.addOption(tserverOption); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 3f1b63c5d2..fd50fd199e 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -48,7 +48,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -57,9 +59,14 @@ import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; import org.apache.accumulo.coordinator.CompactionCoordinator; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.ActiveCompaction; +import org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; @@ -84,12 +91,14 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; +import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.SlowIterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterEach; @@ -531,4 +540,86 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { } + @Test + public void testGetActiveCompactions() throws Exception { + final String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE8); + getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class); + + createTable(client, table1, "cs8"); + + try (BatchWriter bw = client.createBatchWriter(table1)) { + for (int i = 1; i <= MAX_DATA; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put("cf", "cq", new Value()); + bw.addMutation(m); + bw.flush(); + // flush often to create multiple files to compact + client.tableOperations().flush(table1, null, null, true); + } + } + + final AtomicReference<Exception> error = new AtomicReference<>(); + final CountDownLatch started = new CountDownLatch(1); + Thread t = new Thread(() -> { + try { + IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class); + setting.addOption("sleepTime", "3000"); + setting.addOption("seekSleepTime", "3000"); + client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); + started.countDown(); + client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException e) { + error.set(e); + } + }); + t.start(); + + started.await(); + + List<ActiveCompaction> compactions = new ArrayList<>(); + do { + client.instanceOperations().getActiveCompactions().forEach((ac) -> { + try { + if (ac.getTable().equals(table1)) { + compactions.add(ac); + } + } catch (TableNotFoundException e1) { + fail("Table was deleted during test, should not happen"); + } + }); + Thread.sleep(1000); + } while (compactions.isEmpty()); + + ActiveCompaction running1 = compactions.get(0); + CompactionHost host = running1.getHost(); + assertTrue(host.getType() == CompactionHost.Type.COMPACTOR); + + compactions.clear(); + do { + HostAndPort hp = HostAndPort.fromParts(host.getAddress(), host.getPort()); + client.instanceOperations().getActiveCompactions(hp.toString()).forEach((ac) -> { + try { + if (ac.getTable().equals(table1)) { + compactions.add(ac); + } + } catch (TableNotFoundException e1) { + fail("Table was deleted during test, should not happen"); + } + }); + Thread.sleep(1000); + } while (compactions.isEmpty()); + + ActiveCompaction running2 = compactions.get(0); + assertEquals(running1.getInputFiles(), running2.getInputFiles()); + assertEquals(running1.getOutputFile(), running2.getOutputFile()); + assertEquals(running1.getTablet(), running2.getTablet()); + + client.tableOperations().cancelCompaction(table1); + t.join(); + } + } + } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 46aa609e2c..56e57bac33 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.time.Duration; @@ -36,6 +37,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,6 +51,8 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.ActiveCompaction; +import org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; @@ -80,6 +84,7 @@ import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; +import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.VerifyIngest; @@ -715,6 +720,83 @@ public class CompactionIT extends AccumuloClusterHarness { } } + @Test + public void testGetActiveCompactions() throws Exception { + final String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table1); + try (BatchWriter bw = client.createBatchWriter(table1)) { + for (int i = 1; i <= MAX_DATA; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put("cf", "cq", new Value()); + bw.addMutation(m); + bw.flush(); + // flush often to create multiple files to compact + client.tableOperations().flush(table1, null, null, true); + } + } + + final AtomicReference<Exception> error = new AtomicReference<>(); + final CountDownLatch started = new CountDownLatch(1); + Thread t = new Thread(() -> { + try { + IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class); + setting.addOption("sleepTime", "3000"); + setting.addOption("seekSleepTime", "3000"); + client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); + started.countDown(); + client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException e) { + error.set(e); + } + }); + t.start(); + + started.await(); + + List<ActiveCompaction> compactions = new ArrayList<>(); + do { + client.instanceOperations().getActiveCompactions().forEach((ac) -> { + try { + if (ac.getTable().equals(table1)) { + compactions.add(ac); + } + } catch (TableNotFoundException e1) { + fail("Table was deleted during test, should not happen"); + } + }); + Thread.sleep(1000); + } while (compactions.isEmpty()); + + ActiveCompaction running1 = compactions.get(0); + CompactionHost host = running1.getHost(); + assertTrue(host.getType() == CompactionHost.Type.TSERVER); + + compactions.clear(); + do { + HostAndPort hp = HostAndPort.fromParts(host.getAddress(), host.getPort()); + client.instanceOperations().getActiveCompactions(hp.toString()).forEach((ac) -> { + try { + if (ac.getTable().equals(table1)) { + compactions.add(ac); + } + } catch (TableNotFoundException e1) { + fail("Table was deleted during test, should not happen"); + } + }); + Thread.sleep(1000); + } while (compactions.isEmpty()); + + ActiveCompaction running2 = compactions.get(0); + assertEquals(running1.getInputFiles(), running2.getInputFiles()); + assertEquals(running1.getOutputFile(), running2.getOutputFile()); + assertEquals(running1.getTablet(), running2.getTablet()); + + client.tableOperations().cancelCompaction(table1); + t.join(); + } + } + /** * Counts the number of tablets and files in a table. */