This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 4d440c52d1 Status, Reservation, and Fate_Op columns are now in their own column family and locality group (#5031) 4d440c52d1 is described below commit 4d440c52d157c205c900643d36d15f5a015a8277 Author: Arbaaz Khan <bazzy...@yahoo.com> AuthorDate: Mon May 12 10:38:41 2025 -0400 Status, Reservation, and Fate_Op columns are now in their own column family and locality group (#5031) --- .../accumulo/core/fate/user/FateMutator.java | 11 +++++------ .../accumulo/core/fate/user/FateMutatorImpl.java | 17 ++++++++-------- .../core/fate/user/RowFateStatusFilter.java | 2 +- .../core/fate/user/StatusMappingIterator.java | 2 +- .../accumulo/core/fate/user/UserFateStore.java | 22 +++++++++++---------- .../accumulo/core/fate/user/schema/FateSchema.java | 23 ++++++++++++++-------- .../accumulo/server/init/InitialConfiguration.java | 10 +++++----- .../apache/accumulo/test/fate/user/UserFateIT.java | 4 ++-- 8 files changed, 50 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index 0280dbf749..f8a6501147 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -24,6 +24,7 @@ import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.user.schema.FateSchema; public interface FateMutator<T> { @@ -57,9 +58,8 @@ public interface FateMutator<T> { FateMutator<T> requireAbsentKey(); /** - * Add a conditional mutation to - * {@link org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN} - * that will put the reservation if there is not already a reservation present + * Add a conditional mutation to {@link FateSchema.TxAdminColumnFamily#RESERVATION_COLUMN} that + * will put the reservation if there is not already a reservation present * * @param reservation the reservation to attempt to put * @return the FateMutator with this added mutation @@ -67,9 +67,8 @@ public interface FateMutator<T> { FateMutator<T> putReservedTx(FateStore.FateReservation reservation); /** - * Add a conditional mutation to - * {@link org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN} - * that will delete the column if the column value matches the given reservation + * Add a conditional mutation to {@link FateSchema.TxAdminColumnFamily#RESERVATION_COLUMN} that + * will delete the column if the column value matches the given reservation * * @param reservation the reservation to attempt to remove * @return the FateMutator with this added mutation diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index bb33f6ea81..264198cf93 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; import org.apache.accumulo.core.security.Authorizations; @@ -69,7 +70,7 @@ public class FateMutatorImpl<T> implements FateMutator<T> { @Override public FateMutator<T> putStatus(TStatus status) { - TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name())); + TxAdminColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name())); return this; } @@ -96,8 +97,8 @@ public class FateMutatorImpl<T> implements FateMutator<T> { @Override public FateMutator<T> requireUnreserved() { Preconditions.checkState(!requiredUnreserved); - Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + Condition condition = new Condition(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); mutation.addCondition(condition); requiredUnreserved = true; return this; @@ -114,23 +115,23 @@ public class FateMutatorImpl<T> implements FateMutator<T> { @Override public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) { requireUnreserved(); - TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized())); + TxAdminColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized())); return this; } @Override public FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation) { - Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()) + Condition condition = new Condition(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier()) .setValue(reservation.getSerialized()); mutation.addCondition(condition); - TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation); + TxAdminColumnFamily.RESERVATION_COLUMN.putDelete(mutation); return this; } @Override public FateMutator<T> putFateOp(byte[] data) { - TxInfoColumnFamily.FATE_OP_COLUMN.put(mutation, new Value(data)); + TxAdminColumnFamily.FATE_OP_COLUMN.put(mutation, new Value(data)); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java index f6a6b3bef5..a0e19e1faf 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java @@ -58,7 +58,7 @@ public class RowFateStatusFilter extends WholeRowIterator { protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) { for (int i = 0; i < keys.size(); i++) { Key key = keys.get(i); - if (FateSchema.TxColumnFamily.STATUS_COLUMN.hasColumns(key) + if (FateSchema.TxAdminColumnFamily.STATUS_COLUMN.hasColumns(key) && valuesToAccept.contains(ReadOnlyFateStore.TStatus.valueOf(values.get(i).toString()))) { return true; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java index 1a0fae5aa3..83538cbb1c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.core.fate.user; -import static org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily.STATUS_COLUMN; +import static org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily.STATUS_COLUMN; import java.io.IOException; import java.util.Arrays; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 311c2890dd..d3d117c9fa 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -58,6 +58,7 @@ import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; @@ -224,8 +225,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> { // attempt or was not written at all). try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { scanner.setRange(getRow(fateId)); - scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + scanner.fetchColumn(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); FateReservation persistedRes = scanner.stream().map(entry -> FateReservation.deserialize(entry.getValue().get())) .findFirst().orElse(null); @@ -270,9 +271,10 @@ public class UserFateStore<T> extends AbstractFateStore<T> { Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); scanner.setRange(new Range()); RowFateStatusFilter.configureScanner(scanner, statuses); - TxColumnFamily.STATUS_COLUMN.fetch(scanner); - TxColumnFamily.RESERVATION_COLUMN.fetch(scanner); - TxInfoColumnFamily.FATE_OP_COLUMN.fetch(scanner); + // columns fetched here must be in/added to TxAdminColumnFamily for locality group benefits + TxAdminColumnFamily.STATUS_COLUMN.fetch(scanner); + TxAdminColumnFamily.RESERVATION_COLUMN.fetch(scanner); + TxAdminColumnFamily.FATE_OP_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { String txUUIDStr = e.getKey().getRow().toString(); FateId fateId = FateId.from(fateInstanceType, txUUIDStr); @@ -296,13 +298,13 @@ public class UserFateStore<T> extends AbstractFateStore<T> { Text colq = entry.getKey().getColumnQualifier(); Value val = entry.getValue(); switch (colq.toString()) { - case TxColumnFamily.STATUS: + case TxAdminColumnFamily.STATUS: status = TStatus.valueOf(val.toString()); break; - case TxColumnFamily.RESERVATION: + case TxAdminColumnFamily.RESERVATION: reservation = FateReservation.deserialize(val.get()); break; - case TxInfoColumnFamily.FATE_OP: + case TxAdminColumnFamily.FATE_OP: fateOp = (Fate.FateOperation) deserializeTxInfo(TxInfo.FATE_OP, val.get()); break; default: @@ -352,7 +354,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { protected TStatus _getStatus(FateId fateId) { return scanTx(scanner -> { scanner.setRange(getRow(fateId)); - TxColumnFamily.STATUS_COLUMN.fetch(scanner); + TxAdminColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().map(e -> TStatus.valueOf(e.getValue().toString())).findFirst() .orElse(TStatus.UNKNOWN); }); @@ -555,7 +557,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { final ColumnFQ cq; switch (txInfo) { case FATE_OP: - cq = TxInfoColumnFamily.FATE_OP_COLUMN; + cq = TxAdminColumnFamily.FATE_OP_COLUMN; break; case AUTO_CLEAN: cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java index 889ceac1e5..7377b5fc99 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java @@ -27,26 +27,18 @@ public class FateSchema { public static final String STR_NAME = "tx"; public static final Text NAME = new Text(STR_NAME); - public static final String STATUS = "status"; - public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS)); - public static final String TX_KEY = "txkey"; public static final ColumnFQ TX_KEY_COLUMN = new ColumnFQ(NAME, new Text(TX_KEY)); public static final String CREATE_TIME = "ctime"; public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new Text(CREATE_TIME)); - public static final String RESERVATION = "reservation"; - public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION)); } public static class TxInfoColumnFamily { public static final String STR_NAME = "txinfo"; public static final Text NAME = new Text(STR_NAME); - public static final String FATE_OP = "fateop"; - public static final ColumnFQ FATE_OP_COLUMN = new ColumnFQ(NAME, new Text(FATE_OP)); - public static final String AUTO_CLEAN = "autoclean"; public static final ColumnFQ AUTO_CLEAN_COLUMN = new ColumnFQ(NAME, new Text(AUTO_CLEAN)); @@ -65,4 +57,19 @@ public class FateSchema { public static final Text NAME = new Text(STR_NAME); } + // when FATE looks for work, this is the family scanned + public static class TxAdminColumnFamily { + public static final String STR_NAME = "txadmin"; + public static final Text NAME = new Text(STR_NAME); + + public static final String STATUS = "status"; + public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS)); + + public static final String RESERVATION = "reservation"; + public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION)); + + public static final String FATE_OP = "fateop"; + public static final ColumnFQ FATE_OP_COLUMN = new ColumnFQ(NAME, new Text(FATE_OP)); + } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java index cba59893d6..08b58099e3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java @@ -88,11 +88,11 @@ public class InitialConfiguration { initialFateTableConf.putAll(commonConfig); initialFateTableConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "256M"); - // Create a locality group that contains status so its fast to scan. When fate looks for work is - // scans this family. - initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "status", - FateSchema.TxColumnFamily.STR_NAME); - initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "status"); + // Create a locality group that contains tx admin columns so its fast to scan. When fate + // looks for work it scans this family. + initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "txAdmin", + FateSchema.TxAdminColumnFamily.STR_NAME); + initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "txAdmin"); initialScanRefTableConf.putAll(commonConfig); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java index e4a4d2c159..30d45fedfb 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java @@ -36,7 +36,7 @@ import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.user.UserFateStore; -import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema; import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.security.Authorizations; @@ -124,7 +124,7 @@ public class UserFateIT extends FateIT { protected TStatus getTxStatus(ServerContext context, FateId fateId) { try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY)) { scanner.setRange(getRow(fateId)); - TxColumnFamily.STATUS_COLUMN.fetch(scanner); + FateSchema.TxAdminColumnFamily.STATUS_COLUMN.fetch(scanner); return StreamSupport.stream(scanner.spliterator(), false) .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); } catch (TableNotFoundException e) {