This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 2250ad1c31 Replaces pair with record in putSplits impl (#5906)
2250ad1c31 is described below
commit 2250ad1c31c4510c0319c7080e0a24091539e73b
Author: Keith Turner <[email protected]>
AuthorDate: Tue Sep 23 10:39:31 2025 -0400
Replaces pair with record in putSplits impl (#5906)
---
.../core/clientImpl/TableOperationsImpl.java | 35 ++++++++++++----------
.../core/clientImpl/TabletMergeabilityUtil.java | 27 ++++++-----------
.../clientImpl/TabletMergeabilityUtilTest.java | 12 ++++----
.../accumulo/manager/FateServiceHandler.java | 7 +++--
.../apache/accumulo/manager/tableOps/Utils.java | 6 ++--
.../accumulo/manager/tableOps/split/SplitInfo.java | 6 ++--
6 files changed, 45 insertions(+), 48 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index fc85049b13..082b57169c 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -516,7 +516,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
tabLocator.invalidateCache();
var splitsToTablets = mapSplitsToTablets(tableName, tableId, tabLocator,
splitsTodo);
- Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits =
splitsToTablets.newSplits;
+ Map<KeyExtent,List<SplitMergeability>> tabletSplits =
splitsToTablets.newSplits;
Map<KeyExtent,TabletMergeability> existingSplits =
splitsToTablets.existingSplits;
List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -542,9 +542,11 @@ public class TableOperationsImpl extends
TableOperationsHelper {
}, waitExecutor));
}
+ record FateSplits(TFateId opid, List<SplitMergeability> splits) {
+ }
+
// begin the fate operation for each tablet without waiting for the
operation to complete
- for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>>
splitsForTablet : tabletSplits
- .entrySet()) {
+ for (Entry<KeyExtent,List<SplitMergeability>> splitsForTablet :
tabletSplits.entrySet()) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
var extent = splitsForTablet.getKey();
@@ -562,7 +564,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
TFateInstanceType t =
FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
TFateId opid = beginFateOperation(t);
executeFateOperation(opid, TFateOperation.TABLE_SPLIT, args,
Map.of(), false);
- return new Pair<>(opid, splitsForTablet.getValue());
+ return new FateSplits(opid, splitsForTablet.getValue());
}, tableName);
} catch (TableExistsException | NamespaceExistsException |
NamespaceNotFoundException
| AccumuloSecurityException | TableNotFoundException |
AccumuloException e) {
@@ -571,15 +573,15 @@ public class TableOperationsImpl extends
TableOperationsHelper {
throw new CompletionException(e);
}
// wait for the fate operation to complete in a separate thread pool
- }, startExecutor).thenApplyAsync(pair -> {
- final TFateId opid = pair.getFirst();
- final List<Pair<Text,TabletMergeability>> completedSplits =
pair.getSecond();
+ }, startExecutor).thenApplyAsync(fateSplits -> {
try {
- String status = handleFateOperation(() ->
waitForFateOperation(opid), tableName);
+ String status =
+ handleFateOperation(() ->
waitForFateOperation(fateSplits.opid()), tableName);
if (SPLIT_SUCCESS_MSG.equals(status)) {
-
completedSplits.stream().map(Pair::getFirst).forEach(splitsTodo::remove);
+ fateSplits.splits().stream().map(SplitMergeability::split)
+ .forEach(splitsTodo::remove);
}
} catch (TableExistsException | NamespaceExistsException |
NamespaceNotFoundException
| AccumuloSecurityException | TableNotFoundException |
AccumuloException e) {
@@ -588,9 +590,9 @@ public class TableOperationsImpl extends
TableOperationsHelper {
throw new CompletionException(e);
} finally {
// always finish table op, even when exception
- if (opid != null) {
+ if (fateSplits.opid() != null) {
try {
- finishFateOperation(opid);
+ finishFateOperation(fateSplits.opid());
} catch (Exception e) {
log.warn("Exception thrown while finishing fate table
operation", e);
}
@@ -634,7 +636,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
private SplitsToTablets mapSplitsToTablets(String tableName, TableId tableId,
ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability>
splitsTodo)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
- Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits = new
HashMap<>();
+ Map<KeyExtent,List<SplitMergeability>> newSplits = new HashMap<>();
Map<KeyExtent,TabletMergeability> existingSplits = new HashMap<>();
for (Entry<Text,TabletMergeability> splitEntry : splitsTodo.entrySet()) {
@@ -664,7 +666,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
}
newSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
- .add(Pair.fromEntry(splitEntry));
+ .add(new SplitMergeability(splitEntry.getKey(),
splitEntry.getValue()));
} catch (InvalidTabletHostingRequestException e) {
// not expected
@@ -674,11 +676,14 @@ public class TableOperationsImpl extends
TableOperationsHelper {
return new SplitsToTablets(newSplits, existingSplits);
}
+ public record SplitMergeability(Text split, TabletMergeability mergeability)
{
+ }
+
private static class SplitsToTablets {
- final Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits;
+ final Map<KeyExtent,List<SplitMergeability>> newSplits;
final Map<KeyExtent,TabletMergeability> existingSplits;
- private SplitsToTablets(Map<KeyExtent,List<Pair<Text,TabletMergeability>>>
newSplits,
+ private SplitsToTablets(Map<KeyExtent,List<SplitMergeability>> newSplits,
Map<KeyExtent,TabletMergeability> existingSplits) {
this.newSplits = Objects.requireNonNull(newSplits);
this.existingSplits = Objects.requireNonNull(existingSplits);
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java
index 18141dae1d..d26ff5804c 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java
@@ -32,10 +32,10 @@ import java.util.function.Supplier;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.client.admin.TabletMergeabilityInfo;
+import
org.apache.accumulo.core.clientImpl.TableOperationsImpl.SplitMergeability;
import org.apache.accumulo.core.manager.thrift.TTabletMergeability;
import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata;
import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.json.ByteArrayToBase64TypeAdapter;
import org.apache.accumulo.core.util.time.SteadyTime;
@@ -73,30 +73,24 @@ public class TabletMergeabilityUtil {
// Encode A split and TabletMergeability as json. The split will be Base64
encoded
public static String encode(Text split, TabletMergeability
tabletMergeability) {
- GSonData jData = new GSonData();
- jData.split = TextUtil.getBytes(split);
- jData.never = tabletMergeability.isNever();
- jData.delay =
tabletMergeability.getDelay().map(Duration::toNanos).orElse(null);
+ GSonData jData = new GSonData(TextUtil.getBytes(split),
tabletMergeability.isNever(),
+ tabletMergeability.getDelay().map(Duration::toNanos).orElse(null));
return gson.toJson(jData);
}
- public static ByteBuffer encodeAsBuffer(Pair<Text,TabletMergeability> split)
{
- return ByteBuffer.wrap(encode(split).getBytes(UTF_8));
+ public static ByteBuffer encodeAsBuffer(SplitMergeability sm) {
+ return ByteBuffer.wrap(encode(sm.split(),
sm.mergeability()).getBytes(UTF_8));
}
- public static String encode(Pair<Text,TabletMergeability> split) {
- return encode(split.getFirst(), split.getSecond());
- }
-
- public static Pair<Text,TabletMergeability> decode(ByteBuffer data) {
+ public static SplitMergeability decode(ByteBuffer data) {
return decode(ByteBufferUtil.toString(data));
}
- public static Pair<Text,TabletMergeability> decode(String data) {
+ public static SplitMergeability decode(String data) {
GSonData jData = gson.fromJson(data, GSonData.class);
Preconditions.checkArgument(jData.never == (jData.delay == null),
"delay should both be set if and only if mergeability 'never' is
false");
- return new Pair<>(new Text(jData.split), jData.never ?
TabletMergeability.never()
+ return new SplitMergeability(new Text(jData.split), jData.never ?
TabletMergeability.never()
: TabletMergeability.after(Duration.ofNanos(jData.delay)));
}
@@ -125,10 +119,7 @@ public class TabletMergeabilityUtil {
return currentTime.compareTo(totalDelay) >= 0;
}
- private static class GSonData {
- byte[] split;
- boolean never;
- Long delay;
+ private record GSonData(byte[] split, boolean never, Long delay) {
}
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtilTest.java
b/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtilTest.java
index 42564322fe..16c1b7c6f3 100644
---
a/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtilTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtilTest.java
@@ -34,8 +34,8 @@ public class TabletMergeabilityUtilTest {
var text = getRandomText();
String json = TabletMergeabilityUtil.encode(text,
TabletMergeability.never());
var decoded = TabletMergeabilityUtil.decode(json);
- assertEquals(text, decoded.getFirst());
- assertEquals(TabletMergeability.never(), decoded.getSecond());
+ assertEquals(text, decoded.split());
+ assertEquals(TabletMergeability.never(), decoded.mergeability());
}
@Test
@@ -43,8 +43,8 @@ public class TabletMergeabilityUtilTest {
var text = getRandomText();
String json = TabletMergeabilityUtil.encode(text,
TabletMergeability.always());
var decoded = TabletMergeabilityUtil.decode(json);
- assertEquals(text, decoded.getFirst());
- assertEquals(TabletMergeability.always(), decoded.getSecond());
+ assertEquals(text, decoded.split());
+ assertEquals(TabletMergeability.always(), decoded.mergeability());
}
@Test
@@ -53,8 +53,8 @@ public class TabletMergeabilityUtilTest {
var jsonBuffer =
TabletMergeabilityUtil.encodeAsBuffer(text,
TabletMergeability.after(Duration.ofDays(1)));
var decoded = TabletMergeabilityUtil.decode(jsonBuffer);
- assertEquals(text, decoded.getFirst());
- assertEquals(TabletMergeability.after(Duration.ofDays(1)),
decoded.getSecond());
+ assertEquals(text, decoded.split());
+ assertEquals(TabletMergeability.after(Duration.ofDays(1)),
decoded.mergeability());
}
private Text getRandomText() {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
index 1e125a35af..358569b19b 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
@@ -54,6 +54,7 @@ import
org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
+import
org.apache.accumulo.core.clientImpl.TableOperationsImpl.SplitMergeability;
import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil;
import org.apache.accumulo.core.clientImpl.UserCompactionUtils;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
@@ -81,7 +82,6 @@ import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.manager.thrift.ThriftPropertyException;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.Validator;
import org.apache.accumulo.core.util.tables.TableNameUtil;
@@ -746,8 +746,9 @@ class FateServiceHandler implements FateService.Iface {
SortedMap<Text,
TabletMergeability> splits = arguments.subList(SPLIT_OFFSET,
arguments.size()).stream()
- .map(TabletMergeabilityUtil::decode).collect(
- Collectors.toMap(Pair::getFirst, Pair::getSecond, (a, b)
-> a, TreeMap::new));
+ .map(TabletMergeabilityUtil::decode)
+ .collect(Collectors.toMap(SplitMergeability::split,
SplitMergeability::mergeability,
+ (a, b) -> a, TreeMap::new));
KeyExtent extent = new KeyExtent(tableId, endRow, prevEndRow);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
index 41b5759d7a..b0e5df03ed 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
@@ -34,6 +34,7 @@ import java.util.function.Function;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
@@ -53,7 +54,6 @@ import
org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooReservation;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.FileSystem;
@@ -338,8 +338,8 @@ public class Utils {
while (file.hasNextLine()) {
String line = file.nextLine();
log.trace("split line: {}", line);
- Pair<Text,TabletMergeability> splitTm =
TabletMergeabilityUtil.decode(line);
- data.put(splitTm.getFirst(), splitTm.getSecond());
+ TableOperationsImpl.SplitMergeability splitTm =
TabletMergeabilityUtil.decode(line);
+ data.put(splitTm.split(), splitTm.mergeability());
}
}
return data;
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
index fe71e91d55..c13862905c 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
@@ -68,9 +68,9 @@ public class SplitInfo implements Serializable {
NavigableMap<Text,TabletMergeability> getSplits() {
NavigableMap<Text,TabletMergeability> splits = new TreeMap<>();
- for (int i = 0; i < this.splits.length; i++) {
- var split =
TabletMergeabilityUtil.decode(ByteBuffer.wrap(this.splits[i]));
- splits.put(split.getFirst(), split.getSecond());
+ for (byte[] split : this.splits) {
+ var splitMergeability =
TabletMergeabilityUtil.decode(ByteBuffer.wrap(split));
+ splits.put(splitMergeability.split(), splitMergeability.mergeability());
}
return splits;
}