This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 4d440c52d1 Status, Reservation, and Fate_Op columns are now in their
own column family and locality group (#5031)
4d440c52d1 is described below
commit 4d440c52d157c205c900643d36d15f5a015a8277
Author: Arbaaz Khan <[email protected]>
AuthorDate: Mon May 12 10:38:41 2025 -0400
Status, Reservation, and Fate_Op columns are now in their own column family
and locality group (#5031)
---
.../accumulo/core/fate/user/FateMutator.java | 11 +++++------
.../accumulo/core/fate/user/FateMutatorImpl.java | 17 ++++++++--------
.../core/fate/user/RowFateStatusFilter.java | 2 +-
.../core/fate/user/StatusMappingIterator.java | 2 +-
.../accumulo/core/fate/user/UserFateStore.java | 22 +++++++++++----------
.../accumulo/core/fate/user/schema/FateSchema.java | 23 ++++++++++++++--------
.../accumulo/server/init/InitialConfiguration.java | 10 +++++-----
.../apache/accumulo/test/fate/user/UserFateIT.java | 4 ++--
8 files changed, 50 insertions(+), 41 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
index 0280dbf749..f8a6501147 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
@@ -24,6 +24,7 @@ import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.user.schema.FateSchema;
public interface FateMutator<T> {
@@ -57,9 +58,8 @@ public interface FateMutator<T> {
FateMutator<T> requireAbsentKey();
/**
- * Add a conditional mutation to
- * {@link
org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN}
- * that will put the reservation if there is not already a reservation
present
+ * Add a conditional mutation to {@link
FateSchema.TxAdminColumnFamily#RESERVATION_COLUMN} that
+ * will put the reservation if there is not already a reservation present
*
* @param reservation the reservation to attempt to put
* @return the FateMutator with this added mutation
@@ -67,9 +67,8 @@ public interface FateMutator<T> {
FateMutator<T> putReservedTx(FateStore.FateReservation reservation);
/**
- * Add a conditional mutation to
- * {@link
org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN}
- * that will delete the column if the column value matches the given
reservation
+ * Add a conditional mutation to {@link
FateSchema.TxAdminColumnFamily#RESERVATION_COLUMN} that
+ * will delete the column if the column value matches the given reservation
*
* @param reservation the reservation to attempt to remove
* @return the FateMutator with this added mutation
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
index bb33f6ea81..264198cf93 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
+import
org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
@@ -69,7 +70,7 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
@Override
public FateMutator<T> putStatus(TStatus status) {
- TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
+ TxAdminColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
return this;
}
@@ -96,8 +97,8 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
@Override
public FateMutator<T> requireUnreserved() {
Preconditions.checkState(!requiredUnreserved);
- Condition condition = new
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
- TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+ Condition condition = new
Condition(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+ TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
mutation.addCondition(condition);
requiredUnreserved = true;
return this;
@@ -114,23 +115,23 @@ public class FateMutatorImpl<T> implements FateMutator<T>
{
@Override
public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
requireUnreserved();
- TxColumnFamily.RESERVATION_COLUMN.put(mutation, new
Value(reservation.getSerialized()));
+ TxAdminColumnFamily.RESERVATION_COLUMN.put(mutation, new
Value(reservation.getSerialized()));
return this;
}
@Override
public FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation) {
- Condition condition = new
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
- TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
+ Condition condition = new
Condition(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+ TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
.setValue(reservation.getSerialized());
mutation.addCondition(condition);
- TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
+ TxAdminColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
return this;
}
@Override
public FateMutator<T> putFateOp(byte[] data) {
- TxInfoColumnFamily.FATE_OP_COLUMN.put(mutation, new Value(data));
+ TxAdminColumnFamily.FATE_OP_COLUMN.put(mutation, new Value(data));
return this;
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java
index f6a6b3bef5..a0e19e1faf 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java
@@ -58,7 +58,7 @@ public class RowFateStatusFilter extends WholeRowIterator {
protected boolean filter(Text currentRow, List<Key> keys, List<Value>
values) {
for (int i = 0; i < keys.size(); i++) {
Key key = keys.get(i);
- if (FateSchema.TxColumnFamily.STATUS_COLUMN.hasColumns(key)
+ if (FateSchema.TxAdminColumnFamily.STATUS_COLUMN.hasColumns(key)
&&
valuesToAccept.contains(ReadOnlyFateStore.TStatus.valueOf(values.get(i).toString())))
{
return true;
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java
index 1a0fae5aa3..83538cbb1c 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java
@@ -18,7 +18,7 @@
*/
package org.apache.accumulo.core.fate.user;
-import static
org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily.STATUS_COLUMN;
+import static
org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily.STATUS_COLUMN;
import java.io.IOException;
import java.util.Arrays;
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index 311c2890dd..d3d117c9fa 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -58,6 +58,7 @@ import org.apache.accumulo.core.fate.ReadOnlyRepo;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.StackOverflowException;
import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
+import
org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
@@ -224,8 +225,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
// attempt or was not written at all).
try (Scanner scanner = context.createScanner(tableName,
Authorizations.EMPTY)) {
scanner.setRange(getRow(fateId));
-
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
- TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+
scanner.fetchColumn(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+ TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
FateReservation persistedRes =
scanner.stream().map(entry ->
FateReservation.deserialize(entry.getValue().get()))
.findFirst().orElse(null);
@@ -270,9 +271,10 @@ public class UserFateStore<T> extends AbstractFateStore<T>
{
Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY);
scanner.setRange(new Range());
RowFateStatusFilter.configureScanner(scanner, statuses);
- TxColumnFamily.STATUS_COLUMN.fetch(scanner);
- TxColumnFamily.RESERVATION_COLUMN.fetch(scanner);
- TxInfoColumnFamily.FATE_OP_COLUMN.fetch(scanner);
+ // columns fetched here must be in/added to TxAdminColumnFamily for
locality group benefits
+ TxAdminColumnFamily.STATUS_COLUMN.fetch(scanner);
+ TxAdminColumnFamily.RESERVATION_COLUMN.fetch(scanner);
+ TxAdminColumnFamily.FATE_OP_COLUMN.fetch(scanner);
return scanner.stream().onClose(scanner::close).map(e -> {
String txUUIDStr = e.getKey().getRow().toString();
FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
@@ -296,13 +298,13 @@ public class UserFateStore<T> extends
AbstractFateStore<T> {
Text colq = entry.getKey().getColumnQualifier();
Value val = entry.getValue();
switch (colq.toString()) {
- case TxColumnFamily.STATUS:
+ case TxAdminColumnFamily.STATUS:
status = TStatus.valueOf(val.toString());
break;
- case TxColumnFamily.RESERVATION:
+ case TxAdminColumnFamily.RESERVATION:
reservation = FateReservation.deserialize(val.get());
break;
- case TxInfoColumnFamily.FATE_OP:
+ case TxAdminColumnFamily.FATE_OP:
fateOp = (Fate.FateOperation) deserializeTxInfo(TxInfo.FATE_OP,
val.get());
break;
default:
@@ -352,7 +354,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
protected TStatus _getStatus(FateId fateId) {
return scanTx(scanner -> {
scanner.setRange(getRow(fateId));
- TxColumnFamily.STATUS_COLUMN.fetch(scanner);
+ TxAdminColumnFamily.STATUS_COLUMN.fetch(scanner);
return scanner.stream().map(e ->
TStatus.valueOf(e.getValue().toString())).findFirst()
.orElse(TStatus.UNKNOWN);
});
@@ -555,7 +557,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
final ColumnFQ cq;
switch (txInfo) {
case FATE_OP:
- cq = TxInfoColumnFamily.FATE_OP_COLUMN;
+ cq = TxAdminColumnFamily.FATE_OP_COLUMN;
break;
case AUTO_CLEAN:
cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN;
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java
index 889ceac1e5..7377b5fc99 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java
@@ -27,26 +27,18 @@ public class FateSchema {
public static final String STR_NAME = "tx";
public static final Text NAME = new Text(STR_NAME);
- public static final String STATUS = "status";
- public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new
Text(STATUS));
-
public static final String TX_KEY = "txkey";
public static final ColumnFQ TX_KEY_COLUMN = new ColumnFQ(NAME, new
Text(TX_KEY));
public static final String CREATE_TIME = "ctime";
public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new
Text(CREATE_TIME));
- public static final String RESERVATION = "reservation";
- public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new
Text(RESERVATION));
}
public static class TxInfoColumnFamily {
public static final String STR_NAME = "txinfo";
public static final Text NAME = new Text(STR_NAME);
- public static final String FATE_OP = "fateop";
- public static final ColumnFQ FATE_OP_COLUMN = new ColumnFQ(NAME, new
Text(FATE_OP));
-
public static final String AUTO_CLEAN = "autoclean";
public static final ColumnFQ AUTO_CLEAN_COLUMN = new ColumnFQ(NAME, new
Text(AUTO_CLEAN));
@@ -65,4 +57,19 @@ public class FateSchema {
public static final Text NAME = new Text(STR_NAME);
}
+ // when FATE looks for work, this is the family scanned
+ public static class TxAdminColumnFamily {
+ public static final String STR_NAME = "txadmin";
+ public static final Text NAME = new Text(STR_NAME);
+
+ public static final String STATUS = "status";
+ public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new
Text(STATUS));
+
+ public static final String RESERVATION = "reservation";
+ public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new
Text(RESERVATION));
+
+ public static final String FATE_OP = "fateop";
+ public static final ColumnFQ FATE_OP_COLUMN = new ColumnFQ(NAME, new
Text(FATE_OP));
+ }
+
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
index cba59893d6..08b58099e3 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
@@ -88,11 +88,11 @@ public class InitialConfiguration {
initialFateTableConf.putAll(commonConfig);
initialFateTableConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "256M");
- // Create a locality group that contains status so its fast to scan. When
fate looks for work is
- // scans this family.
- initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() +
"status",
- FateSchema.TxColumnFamily.STR_NAME);
- initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(),
"status");
+ // Create a locality group that contains tx admin columns so its fast to
scan. When fate
+ // looks for work it scans this family.
+ initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() +
"txAdmin",
+ FateSchema.TxAdminColumnFamily.STR_NAME);
+ initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(),
"txAdmin");
initialScanRefTableConf.putAll(commonConfig);
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
index e4a4d2c159..30d45fedfb 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
@@ -36,7 +36,7 @@ import
org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.user.UserFateStore;
-import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
+import org.apache.accumulo.core.fate.user.schema.FateSchema;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.security.Authorizations;
@@ -124,7 +124,7 @@ public class UserFateIT extends FateIT {
protected TStatus getTxStatus(ServerContext context, FateId fateId) {
try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY))
{
scanner.setRange(getRow(fateId));
- TxColumnFamily.STATUS_COLUMN.fetch(scanner);
+ FateSchema.TxAdminColumnFamily.STATUS_COLUMN.fetch(scanner);
return StreamSupport.stream(scanner.spliterator(), false)
.map(e ->
TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN);
} catch (TableNotFoundException e) {