This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new d1c4f90394 Uses ServerContext instead of Manager in various fate code
(#5943)
d1c4f90394 is described below
commit d1c4f90394b6de4965eaaaae924efe8eb947a143
Author: Keith Turner <[email protected]>
AuthorDate: Tue Sep 30 10:44:55 2025 -0400
Uses ServerContext instead of Manager in various fate code (#5943)
Modified functions in the fate code that were taking a Manager argument
when a ServerContext could have been used instead.
---
.../org/apache/accumulo/manager/tableOps/Utils.java | 4 ++--
.../manager/tableOps/compact/CompactionDriver.java | 6 +++---
.../accumulo/manager/tableOps/create/ChooseDir.java | 19 ++++++++++---------
.../manager/tableOps/create/FinishCreateTable.java | 7 ++++---
.../manager/tableOps/create/PopulateMetadata.java | 2 +-
.../manager/tableOps/delete/PreDeleteTable.java | 10 +++++-----
.../manager/tableOps/split/UpdateTablets.java | 8 +++++---
.../manager/tableOps/tableImport/CreateImportDir.java | 10 +++++-----
.../manager/tableOps/tableImport/ImportTable.java | 9 +++++----
.../manager/tableOps/tableImport/ImportTableTest.java | 11 ++++-------
10 files changed, 44 insertions(+), 42 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
index f365bfc425..5b763dcb5b 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
@@ -333,9 +333,9 @@ public class Utils {
return data;
}
- public static SortedMap<Text,TabletMergeability>
getSortedSplitsFromFile(Manager manager,
+ public static SortedMap<Text,TabletMergeability>
getSortedSplitsFromFile(ServerContext ctx,
Path path) throws IOException {
- FileSystem fs = path.getFileSystem(manager.getContext().getHadoopConf());
+ FileSystem fs = path.getFileSystem(ctx.getHadoopConf());
var data = new TreeMap<Text,TabletMergeability>();
try (var file = new java.util.Scanner(fs.open(path), UTF_8)) {
while (file.hasNextLine()) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 7afb061c69..d1b43db6f9 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -349,7 +349,7 @@ public class CompactionDriver extends ManagerRepo {
@Override
public void undo(FateId fateId, Manager env) throws Exception {
- cleanupTabletMetadata(fateId, env);
+ cleanupTabletMetadata(fateId, env.getContext());
// For any compactions that may have happened before this operation
failed, attempt to refresh
// tablets.
@@ -359,8 +359,8 @@ public class CompactionDriver extends ManagerRepo {
/**
* Cleans up any tablet metadata that may have been added as part of this
compaction operation.
*/
- private void cleanupTabletMetadata(FateId fateId, Manager manager) throws
Exception {
- var ample = manager.getContext().getAmple();
+ private void cleanupTabletMetadata(FateId fateId, ServerContext ctx) throws
Exception {
+ var ample = ctx.getAmple();
boolean allCleanedUp = false;
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java
index 4df382ef1e..980fc01b26 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.TableInfo;
import org.apache.accumulo.manager.tableOps.Utils;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -60,7 +61,7 @@ class ChooseDir extends ManagerRepo {
@Override
public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
if (tableInfo.getInitialSplitSize() > 0) {
- createTableDirectoriesInfo(manager);
+ createTableDirectoriesInfo(manager.getContext());
}
return new PopulateMetadata(tableInfo);
}
@@ -85,20 +86,20 @@ class ChooseDir extends ManagerRepo {
* Create unique table directory names that will be associated with split
values. Then write these
* to the file system for later use during this FATE operation.
*/
- private void createTableDirectoriesInfo(Manager manager) throws IOException {
+ private void createTableDirectoriesInfo(ServerContext ctx) throws
IOException {
SortedMap<Text,TabletMergeability> splits =
- Utils.getSortedSplitsFromFile(manager, tableInfo.getSplitPath());
- SortedSet<Text> tabletDirectoryInfo = createTabletDirectoriesSet(manager,
splits.size());
- writeTabletDirectoriesToFileSystem(manager, tabletDirectoryInfo);
+ Utils.getSortedSplitsFromFile(ctx, tableInfo.getSplitPath());
+ SortedSet<Text> tabletDirectoryInfo = createTabletDirectoriesSet(ctx,
splits.size());
+ writeTabletDirectoriesToFileSystem(ctx, tabletDirectoryInfo);
}
/**
* Create a set of unique table directories. These will be associated with
splits in a follow-on
* FATE step.
*/
- private static SortedSet<Text> createTabletDirectoriesSet(Manager manager,
int num) {
+ private static SortedSet<Text> createTabletDirectoriesSet(ServerContext ctx,
int num) {
String tabletDir;
- UniqueNameAllocator namer = manager.getContext().getUniqueNameAllocator();
+ UniqueNameAllocator namer = ctx.getUniqueNameAllocator();
SortedSet<Text> splitDirs = new TreeSet<>();
Iterator<String> names = namer.getNextNames(num);
for (int i = 0; i < num; i++) {
@@ -112,10 +113,10 @@ class ChooseDir extends ManagerRepo {
* Write the SortedSet of Tablet Directory names to the file system for use
in the next phase of
* the FATE operation.
*/
- private void writeTabletDirectoriesToFileSystem(Manager manager,
SortedSet<Text> dirs)
+ private void writeTabletDirectoriesToFileSystem(ServerContext ctx,
SortedSet<Text> dirs)
throws IOException {
Path p = tableInfo.getSplitDirsPath();
- FileSystem fs = p.getFileSystem(manager.getContext().getHadoopConf());
+ FileSystem fs = p.getFileSystem(ctx.getHadoopConf());
if (fs.exists(p)) {
fs.delete(p, true);
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
index 2ba7e03a36..aa75956694 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.TableInfo;
import org.apache.accumulo.manager.tableOps.Utils;
+import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -70,18 +71,18 @@ class FinishCreateTable extends ManagerRepo {
tableInfo.getTableName());
if (tableInfo.getInitialSplitSize() > 0) {
- cleanupSplitFiles(env);
+ cleanupSplitFiles(env.getContext());
}
return null;
}
- private void cleanupSplitFiles(Manager env) throws IOException {
+ private void cleanupSplitFiles(ServerContext ctx) throws IOException {
// it is sufficient to delete from the parent, because both files are in
the same directory, and
// we want to delete the directory also
Path p = null;
try {
p = tableInfo.getSplitPath().getParent();
- FileSystem fs = p.getFileSystem(env.getContext().getHadoopConf());
+ FileSystem fs = p.getFileSystem(ctx.getHadoopConf());
fs.delete(p, true);
} catch (IOException e) {
log.error("Table was created, but failed to clean up temporary splits
files at {}", p, e);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
index d487263b10..2c96c6abf4 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
@@ -67,7 +67,7 @@ class PopulateMetadata extends ManagerRepo {
Map<Text,Text> splitDirMap;
if (tableInfo.getInitialSplitSize() > 0) {
- splits = Utils.getSortedSplitsFromFile(env, tableInfo.getSplitPath());
+ splits = Utils.getSortedSplitsFromFile(env.getContext(),
tableInfo.getSplitPath());
SortedSet<Text> dirs = Utils.getSortedSetFromFile(env,
tableInfo.getSplitDirsPath(), false);
splitDirMap = createSplitDirectoryMap(splits, dirs);
} else {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
index 4d7f8a4ca7..9481678d22 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
@@ -31,6 +31,7 @@ import
org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.Utils;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionConfigStorage;
import org.apache.zookeeper.KeeperException;
@@ -57,18 +58,17 @@ public class PreDeleteTable extends ManagerRepo {
+ Utils.reserveTable(env, tableId, fateId, LockType.READ, true,
TableOperation.DELETE);
}
- private void preventFutureCompactions(Manager environment)
+ private void preventFutureCompactions(ServerContext ctx)
throws KeeperException, InterruptedException {
- String deleteMarkerPath =
- createDeleteMarkerPath(environment.getContext().getInstanceID(),
tableId);
- ZooReaderWriter zoo =
environment.getContext().getZooSession().asReaderWriter();
+ String deleteMarkerPath = createDeleteMarkerPath(ctx.getInstanceID(),
tableId);
+ ZooReaderWriter zoo = ctx.getZooSession().asReaderWriter();
zoo.putPersistentData(deleteMarkerPath, new byte[] {},
NodeExistsPolicy.SKIP);
}
@Override
public Repo<Manager> call(FateId fateId, Manager environment) throws
Exception {
try {
- preventFutureCompactions(environment);
+ preventFutureCompactions(environment.getContext());
var idsToCancel =
CompactionConfigStorage.getAllConfig(environment.getContext(),
tableId::equals).keySet();
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
index f7c9d97f67..c07fe6de0c 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
@@ -44,6 +44,7 @@ import
org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.core.util.RowRangeUtil;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +118,8 @@ public class UpdateTablets extends ManagerRepo {
// Only update the original tablet after successfully creating the new
tablets, this is
// important for failure cases where this operation partially runs a then
runs again.
- updateExistingTablet(fateId, manager, tabletMetadata, opid, newTablets,
newTabletsFiles);
+ updateExistingTablet(fateId, manager.getContext(), tabletMetadata, opid,
newTablets,
+ newTabletsFiles);
return new DeleteOperationIds(splitInfo);
}
@@ -262,10 +264,10 @@ public class UpdateTablets extends ManagerRepo {
}
}
- private void updateExistingTablet(FateId fateId, Manager manager,
TabletMetadata tabletMetadata,
+ private void updateExistingTablet(FateId fateId, ServerContext ctx,
TabletMetadata tabletMetadata,
TabletOperationId opid, NavigableMap<KeyExtent,TabletMergeability>
newTablets,
Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) {
- try (var tabletsMutator =
manager.getContext().getAmple().conditionallyMutateTablets()) {
+ try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var newExtent = newTablets.navigableKeySet().last();
var mutator =
tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireOperation(opid)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/CreateImportDir.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/CreateImportDir.java
index 8a4b21ee0a..dc408d3882 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/CreateImportDir.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/CreateImportDir.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -47,7 +48,7 @@ class CreateImportDir extends ManagerRepo {
Set<String> tableDirs = manager.getContext().getTablesDirs();
- create(tableDirs, manager);
+ create(tableDirs, manager.getContext());
return new MapImportFileNames(tableInfo);
}
@@ -58,19 +59,18 @@ class CreateImportDir extends ManagerRepo {
*
* @param tableDirs the set of table directories on HDFS where files will be
moved e.g:
* hdfs://volume1/accumulo/tables/
- * @param manager the manager instance performing the table import.
* @throws IOException if any import directory does not reside on a volume
configured for
* accumulo.
*/
- void create(Set<String> tableDirs, Manager manager) throws IOException {
- UniqueNameAllocator namer = manager.getContext().getUniqueNameAllocator();
+ void create(Set<String> tableDirs, ServerContext ctx) throws IOException {
+ UniqueNameAllocator namer = ctx.getUniqueNameAllocator();
Iterator<String> names = namer.getNextNames(tableInfo.directories.size());
for (ImportedTableInfo.DirectoryMapping dm : tableInfo.directories) {
Path exportDir = new Path(dm.exportDir);
log.info("Looking for matching filesystem for {} from options {}",
exportDir, tableDirs);
- Path base = manager.getVolumeManager().matchingFileSystem(exportDir,
tableDirs);
+ Path base = ctx.getVolumeManager().matchingFileSystem(exportDir,
tableDirs);
if (base == null) {
throw new IOException(
dm.exportDir + " is not in the same file system as any volume
configured for Accumulo");
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTable.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTable.java
index d537b10b35..6ab0a39b76 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTable.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTable.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.Utils;
import org.apache.accumulo.manager.tableOps.tableExport.ExportTable;
import org.apache.accumulo.server.AccumuloDataVersion;
+import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,7 +88,7 @@ public class ImportTable extends ManagerRepo {
@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
- checkVersions(env);
+ checkVersions(env.getContext());
// first step is to reserve a table id.. if the machine fails during this
step
// it is ok to retry... the only side effect is that a table id may not be
used
@@ -102,7 +103,7 @@ public class ImportTable extends ManagerRepo {
@SuppressFBWarnings(value = "OS_OPEN_STREAM",
justification = "closing intermediate readers would close the
ZipInputStream")
- public void checkVersions(Manager env) throws
AcceptableThriftTableOperationException {
+ public void checkVersions(ServerContext ctx) throws
AcceptableThriftTableOperationException {
Set<String> exportDirs =
tableInfo.directories.stream().map(dm ->
dm.exportDir).collect(Collectors.toSet());
@@ -112,11 +113,11 @@ public class ImportTable extends ManagerRepo {
Integer dataVersion = null;
try {
- Path exportFilePath =
TableOperationsImpl.findExportFile(env.getContext(), exportDirs);
+ Path exportFilePath = TableOperationsImpl.findExportFile(ctx,
exportDirs);
tableInfo.exportFile = exportFilePath.toString();
log.info("Export file is {}", tableInfo.exportFile);
- ZipInputStream zis = new
ZipInputStream(env.getVolumeManager().open(exportFilePath));
+ ZipInputStream zis = new
ZipInputStream(ctx.getVolumeManager().open(exportFilePath));
ZipEntry zipEntry;
while ((zipEntry = zis.getNextEntry()) != null) {
if (zipEntry.getName().equals(Constants.EXPORT_INFO_FILE)) {
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTableTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTableTest.java
index 38e55c8c84..ab4d4638c4 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTableTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/tableImport/ImportTableTest.java
@@ -31,7 +31,6 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
@@ -75,7 +74,6 @@ public class ImportTableTest {
@Test
public void testCreateImportDir() throws Exception {
- Manager manager = EasyMock.createMock(Manager.class);
ServerContext context = EasyMock.createMock(ServerContext.class);
VolumeManager volumeManager = EasyMock.createMock(VolumeManager.class);
UniqueNameAllocator uniqueNameAllocator =
EasyMock.createMock(UniqueNameAllocator.class);
@@ -90,8 +88,7 @@ public class ImportTableTest {
String dirName = "abcd";
- EasyMock.expect(manager.getContext()).andReturn(context);
-
EasyMock.expect(manager.getVolumeManager()).andReturn(volumeManager).times(3);
+
EasyMock.expect(context.getVolumeManager()).andReturn(volumeManager).times(3);
EasyMock.expect(context.getUniqueNameAllocator()).andReturn(uniqueNameAllocator);
EasyMock.expect(volumeManager.matchingFileSystem(EasyMock.eq(new
Path(expDirs[0])),
EasyMock.eq(tableDirSet))).andReturn(new Path(tableDirs[0]));
@@ -107,10 +104,10 @@ public class ImportTableTest {
ti.directories = ImportTable.parseExportDir(Set.of(expDirs));
assertEquals(3, ti.directories.size());
- EasyMock.replay(manager, context, volumeManager, uniqueNameAllocator);
+ EasyMock.replay(context, volumeManager, uniqueNameAllocator);
CreateImportDir ci = new CreateImportDir(ti);
- ci.create(tableDirSet, manager);
+ ci.create(tableDirSet, context);
assertEquals(3, ti.directories.size());
for (ImportedTableInfo.DirectoryMapping dm : ti.directories) {
assertNotNull(dm.exportDir);
@@ -120,7 +117,7 @@ public class ImportTableTest {
assertTrue(
dm.importDir.contains(ti.tableId.canonical() + "/" +
Constants.BULK_PREFIX + dirName));
}
- EasyMock.verify(manager, context, volumeManager, uniqueNameAllocator);
+ EasyMock.verify(context, volumeManager, uniqueNameAllocator);
}
private static void assertMatchingFilesystem(String expected, String target)
{