This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d440c52d1 Status, Reservation, and Fate_Op columns are now in their 
own column family and locality group (#5031)
4d440c52d1 is described below

commit 4d440c52d157c205c900643d36d15f5a015a8277
Author: Arbaaz Khan <bazzy...@yahoo.com>
AuthorDate: Mon May 12 10:38:41 2025 -0400

    Status, Reservation, and Fate_Op columns are now in their own column family 
and locality group (#5031)
---
 .../accumulo/core/fate/user/FateMutator.java       | 11 +++++------
 .../accumulo/core/fate/user/FateMutatorImpl.java   | 17 ++++++++--------
 .../core/fate/user/RowFateStatusFilter.java        |  2 +-
 .../core/fate/user/StatusMappingIterator.java      |  2 +-
 .../accumulo/core/fate/user/UserFateStore.java     | 22 +++++++++++----------
 .../accumulo/core/fate/user/schema/FateSchema.java | 23 ++++++++++++++--------
 .../accumulo/server/init/InitialConfiguration.java | 10 +++++-----
 .../apache/accumulo/test/fate/user/UserFateIT.java |  4 ++--
 8 files changed, 50 insertions(+), 41 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
index 0280dbf749..f8a6501147 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
@@ -24,6 +24,7 @@ import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.user.schema.FateSchema;
 
 public interface FateMutator<T> {
 
@@ -57,9 +58,8 @@ public interface FateMutator<T> {
   FateMutator<T> requireAbsentKey();
 
   /**
-   * Add a conditional mutation to
-   * {@link 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN}
-   * that will put the reservation if there is not already a reservation 
present
+   * Add a conditional mutation to {@link 
FateSchema.TxAdminColumnFamily#RESERVATION_COLUMN} that
+   * will put the reservation if there is not already a reservation present
    *
    * @param reservation the reservation to attempt to put
    * @return the FateMutator with this added mutation
@@ -67,9 +67,8 @@ public interface FateMutator<T> {
   FateMutator<T> putReservedTx(FateStore.FateReservation reservation);
 
   /**
-   * Add a conditional mutation to
-   * {@link 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN}
-   * that will delete the column if the column value matches the given 
reservation
+   * Add a conditional mutation to {@link 
FateSchema.TxAdminColumnFamily#RESERVATION_COLUMN} that
+   * will delete the column if the column value matches the given reservation
    *
    * @param reservation the reservation to attempt to remove
    * @return the FateMutator with this added mutation
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
index bb33f6ea81..264198cf93 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
+import 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily;
 import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
 import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
@@ -69,7 +70,7 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
 
   @Override
   public FateMutator<T> putStatus(TStatus status) {
-    TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
+    TxAdminColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
     return this;
   }
 
@@ -96,8 +97,8 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
   @Override
   public FateMutator<T> requireUnreserved() {
     Preconditions.checkState(!requiredUnreserved);
-    Condition condition = new 
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
-        TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+    Condition condition = new 
Condition(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+        TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
     mutation.addCondition(condition);
     requiredUnreserved = true;
     return this;
@@ -114,23 +115,23 @@ public class FateMutatorImpl<T> implements FateMutator<T> 
{
   @Override
   public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
     requireUnreserved();
-    TxColumnFamily.RESERVATION_COLUMN.put(mutation, new 
Value(reservation.getSerialized()));
+    TxAdminColumnFamily.RESERVATION_COLUMN.put(mutation, new 
Value(reservation.getSerialized()));
     return this;
   }
 
   @Override
   public FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation) {
-    Condition condition = new 
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
-        TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
+    Condition condition = new 
Condition(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+        TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
         .setValue(reservation.getSerialized());
     mutation.addCondition(condition);
-    TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
+    TxAdminColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
     return this;
   }
 
   @Override
   public FateMutator<T> putFateOp(byte[] data) {
-    TxInfoColumnFamily.FATE_OP_COLUMN.put(mutation, new Value(data));
+    TxAdminColumnFamily.FATE_OP_COLUMN.put(mutation, new Value(data));
     return this;
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java
 
b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java
index f6a6b3bef5..a0e19e1faf 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java
@@ -58,7 +58,7 @@ public class RowFateStatusFilter extends WholeRowIterator {
   protected boolean filter(Text currentRow, List<Key> keys, List<Value> 
values) {
     for (int i = 0; i < keys.size(); i++) {
       Key key = keys.get(i);
-      if (FateSchema.TxColumnFamily.STATUS_COLUMN.hasColumns(key)
+      if (FateSchema.TxAdminColumnFamily.STATUS_COLUMN.hasColumns(key)
           && 
valuesToAccept.contains(ReadOnlyFateStore.TStatus.valueOf(values.get(i).toString())))
 {
         return true;
       }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java
index 1a0fae5aa3..83538cbb1c 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.accumulo.core.fate.user;
 
-import static 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily.STATUS_COLUMN;
+import static 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily.STATUS_COLUMN;
 
 import java.io.IOException;
 import java.util.Arrays;
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index 311c2890dd..d3d117c9fa 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -58,6 +58,7 @@ import org.apache.accumulo.core.fate.ReadOnlyRepo;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.fate.StackOverflowException;
 import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
+import 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily;
 import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
 import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
@@ -224,8 +225,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
       // attempt or was not written at all).
       try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
         scanner.setRange(getRow(fateId));
-        
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
-            TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+        
scanner.fetchColumn(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+            TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
         FateReservation persistedRes =
             scanner.stream().map(entry -> 
FateReservation.deserialize(entry.getValue().get()))
                 .findFirst().orElse(null);
@@ -270,9 +271,10 @@ public class UserFateStore<T> extends AbstractFateStore<T> 
{
       Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY);
       scanner.setRange(new Range());
       RowFateStatusFilter.configureScanner(scanner, statuses);
-      TxColumnFamily.STATUS_COLUMN.fetch(scanner);
-      TxColumnFamily.RESERVATION_COLUMN.fetch(scanner);
-      TxInfoColumnFamily.FATE_OP_COLUMN.fetch(scanner);
+      // columns fetched here must be in/added to TxAdminColumnFamily for 
locality group benefits
+      TxAdminColumnFamily.STATUS_COLUMN.fetch(scanner);
+      TxAdminColumnFamily.RESERVATION_COLUMN.fetch(scanner);
+      TxAdminColumnFamily.FATE_OP_COLUMN.fetch(scanner);
       return scanner.stream().onClose(scanner::close).map(e -> {
         String txUUIDStr = e.getKey().getRow().toString();
         FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
@@ -296,13 +298,13 @@ public class UserFateStore<T> extends 
AbstractFateStore<T> {
           Text colq = entry.getKey().getColumnQualifier();
           Value val = entry.getValue();
           switch (colq.toString()) {
-            case TxColumnFamily.STATUS:
+            case TxAdminColumnFamily.STATUS:
               status = TStatus.valueOf(val.toString());
               break;
-            case TxColumnFamily.RESERVATION:
+            case TxAdminColumnFamily.RESERVATION:
               reservation = FateReservation.deserialize(val.get());
               break;
-            case TxInfoColumnFamily.FATE_OP:
+            case TxAdminColumnFamily.FATE_OP:
               fateOp = (Fate.FateOperation) deserializeTxInfo(TxInfo.FATE_OP, 
val.get());
               break;
             default:
@@ -352,7 +354,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
   protected TStatus _getStatus(FateId fateId) {
     return scanTx(scanner -> {
       scanner.setRange(getRow(fateId));
-      TxColumnFamily.STATUS_COLUMN.fetch(scanner);
+      TxAdminColumnFamily.STATUS_COLUMN.fetch(scanner);
       return scanner.stream().map(e -> 
TStatus.valueOf(e.getValue().toString())).findFirst()
           .orElse(TStatus.UNKNOWN);
     });
@@ -555,7 +557,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
         final ColumnFQ cq;
         switch (txInfo) {
           case FATE_OP:
-            cq = TxInfoColumnFamily.FATE_OP_COLUMN;
+            cq = TxAdminColumnFamily.FATE_OP_COLUMN;
             break;
           case AUTO_CLEAN:
             cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN;
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java
index 889ceac1e5..7377b5fc99 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java
@@ -27,26 +27,18 @@ public class FateSchema {
     public static final String STR_NAME = "tx";
     public static final Text NAME = new Text(STR_NAME);
 
-    public static final String STATUS = "status";
-    public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new 
Text(STATUS));
-
     public static final String TX_KEY = "txkey";
     public static final ColumnFQ TX_KEY_COLUMN = new ColumnFQ(NAME, new 
Text(TX_KEY));
 
     public static final String CREATE_TIME = "ctime";
     public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new 
Text(CREATE_TIME));
 
-    public static final String RESERVATION = "reservation";
-    public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new 
Text(RESERVATION));
   }
 
   public static class TxInfoColumnFamily {
     public static final String STR_NAME = "txinfo";
     public static final Text NAME = new Text(STR_NAME);
 
-    public static final String FATE_OP = "fateop";
-    public static final ColumnFQ FATE_OP_COLUMN = new ColumnFQ(NAME, new 
Text(FATE_OP));
-
     public static final String AUTO_CLEAN = "autoclean";
     public static final ColumnFQ AUTO_CLEAN_COLUMN = new ColumnFQ(NAME, new 
Text(AUTO_CLEAN));
 
@@ -65,4 +57,19 @@ public class FateSchema {
     public static final Text NAME = new Text(STR_NAME);
   }
 
+  // when FATE looks for work, this is the family scanned
+  public static class TxAdminColumnFamily {
+    public static final String STR_NAME = "txadmin";
+    public static final Text NAME = new Text(STR_NAME);
+
+    public static final String STATUS = "status";
+    public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new 
Text(STATUS));
+
+    public static final String RESERVATION = "reservation";
+    public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new 
Text(RESERVATION));
+
+    public static final String FATE_OP = "fateop";
+    public static final ColumnFQ FATE_OP_COLUMN = new ColumnFQ(NAME, new 
Text(FATE_OP));
+  }
+
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
 
b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
index cba59893d6..08b58099e3 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
@@ -88,11 +88,11 @@ public class InitialConfiguration {
 
     initialFateTableConf.putAll(commonConfig);
     initialFateTableConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "256M");
-    // Create a locality group that contains status so its fast to scan. When 
fate looks for work is
-    // scans this family.
-    initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + 
"status",
-        FateSchema.TxColumnFamily.STR_NAME);
-    initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), 
"status");
+    // Create a locality group that contains tx admin columns so its fast to 
scan. When fate
+    // looks for work it scans this family.
+    initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + 
"txAdmin",
+        FateSchema.TxAdminColumnFamily.STR_NAME);
+    initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), 
"txAdmin");
 
     initialScanRefTableConf.putAll(commonConfig);
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
index e4a4d2c159..30d45fedfb 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
@@ -36,7 +36,7 @@ import 
org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.fate.user.UserFateStore;
-import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
+import org.apache.accumulo.core.fate.user.schema.FateSchema;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.metadata.SystemTables;
 import org.apache.accumulo.core.security.Authorizations;
@@ -124,7 +124,7 @@ public class UserFateIT extends FateIT {
   protected TStatus getTxStatus(ServerContext context, FateId fateId) {
     try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY)) 
{
       scanner.setRange(getRow(fateId));
-      TxColumnFamily.STATUS_COLUMN.fetch(scanner);
+      FateSchema.TxAdminColumnFamily.STATUS_COLUMN.fetch(scanner);
       return StreamSupport.stream(scanner.spliterator(), false)
           .map(e -> 
TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN);
     } catch (TableNotFoundException e) {

Reply via email to