This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 19760b6d4a partially implements setting time for bulk imports (#3807)
19760b6d4a is described below

commit 19760b6d4a59b197b80b97b2be7a2d4255591db6
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Oct 4 13:25:04 2023 -0400

    partially implements setting time for bulk imports (#3807)
    
    Implements setting time for unhosted tablets for bulk imports.  Setting
    time for a bulk import will fail if the tablet is hosted.  Before this
    commit it always failed.
    
    see #3354
---
 .../core/metadata/schema/MetadataTime.java         |  1 -
 .../metadata/ConditionalTabletMutatorImpl.java     |  8 ++++
 .../manager/tableOps/bulkVer2/LoadFiles.java       | 43 +++++++++++++++++-----
 .../test/functional/AmpleConditionalWriterIT.java  | 34 +++++++++++++++++
 .../apache/accumulo/test/functional/BulkNewIT.java | 11 ++++--
 5 files changed, 84 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataTime.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataTime.java
index c5313527ec..eb96c309b9 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataTime.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataTime.java
@@ -121,5 +121,4 @@ public final class MetadataTime implements 
Comparable<MetadataTime> {
           "Cannot compare different time types: " + this + " and " + mtime);
     }
   }
-
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 7cbe7b298c..55b5d9f9c6 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN;
+import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow;
 
@@ -222,6 +223,13 @@ public class ConditionalTabletMutatorImpl extends 
TabletMutatorBase<Ample.Condit
         mutation.addCondition(c);
       }
         break;
+      case TIME: {
+        Condition c =
+            new Condition(TIME_COLUMN.getColumnFamily(), 
TIME_COLUMN.getColumnQualifier());
+        c = c.setValue(tabletMetadata.getTime().encode());
+        mutation.addCondition(c);
+      }
+        break;
       default:
         throw new UnsupportedOperationException("Column type " + type + " is 
not supported.");
     }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 6a12a050c0..60738c0475 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps.bulkVer2;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -49,13 +50,12 @@ import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Make asynchronous load calls to each overlapping Tablet. This RepO does its 
work on the isReady
  * and will return a linear sleep value based on the largest number of Tablets 
on a TabletServer.
@@ -101,8 +101,6 @@ class LoadFiles extends ManagerRepo {
     Ample.ConditionalTabletsMutator conditionalMutator;
 
     void start(Path bulkDir, Manager manager, long tid, boolean setTime) 
throws Exception {
-      // ELASTICITY_TODO handle setting time... handle case where tablets are 
hosted and unhosted
-      Preconditions.checkArgument(!setTime);
       this.bulkDir = bulkDir;
       this.manager = manager;
       this.tid = tid;
@@ -115,9 +113,24 @@ class LoadFiles extends ManagerRepo {
       for (TabletMetadata tablet : tablets) {
         Map<ReferencedTabletFile,DataFileValue> filesToLoad = new HashMap<>();
 
+        if (setTime && tablet.getLocation() != null) {
+          throw new IllegalStateException("Setting time on hosted tablet is 
not implemented");
+        }
+
+        var tabletTime = TabletTime.getInstance(tablet.getTime());
+
         for (final Bulk.FileInfo fileInfo : files) {
-          filesToLoad.put(new ReferencedTabletFile(new Path(bulkDir, 
fileInfo.getFileName())),
-              new DataFileValue(fileInfo.getEstFileSize(), 
fileInfo.getEstNumEntries()));
+
+          DataFileValue dfv;
+
+          if (setTime) {
+            dfv = new DataFileValue(fileInfo.getEstFileSize(), 
fileInfo.getEstNumEntries(),
+                tabletTime.getAndUpdateTime());
+          } else {
+            dfv = new DataFileValue(fileInfo.getEstFileSize(), 
fileInfo.getEstNumEntries());
+          }
+
+          filesToLoad.put(new ReferencedTabletFile(new Path(bulkDir, 
fileInfo.getFileName())), dfv);
         }
 
         // remove any files that were already loaded
@@ -127,12 +140,24 @@ class LoadFiles extends ManagerRepo {
 
         if (!filesToLoad.isEmpty()) {
           // ELASTICITY_TODO lets automatically call require prev end row
-          var tabletMutator = 
conditionalMutator.mutateTablet(tablet.getExtent())
-              .requireAbsentOperation().requireSame(tablet, PREV_ROW, LOADED);
+          var tabletMutator =
+              
conditionalMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
+
+          if (setTime) {
+            tabletMutator.requireSame(tablet, PREV_ROW, LOADED, TIME, 
LOCATION);
+          } else {
+            tabletMutator.requireSame(tablet, PREV_ROW, LOADED);
+          }
 
           filesToLoad.forEach((f, v) -> {
             tabletMutator.putBulkFile(f, tid);
             tabletMutator.putFile(f, v);
+
+            if (setTime) {
+              // ELASTICITY_TODO this is not the correct thing to do when the 
tablet is hosted and
+              // could be harmful
+              tabletMutator.putTime(tabletTime.getMetadataTime());
+            }
           });
 
           tabletMutator.submit(tm -> false);
@@ -179,7 +204,7 @@ class LoadFiles extends ManagerRepo {
 
     Iterator<TabletMetadata> tabletIter =
         
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow,
 null)
-            .checkConsistency().fetch(PREV_ROW, LOCATION, 
LOADED).build().iterator();
+            .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED, 
TIME).build().iterator();
 
     Loader loader = new Loader();
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index 396c42e50a..5eb9078192 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -27,6 +27,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -49,6 +50,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
@@ -61,6 +63,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataTime;
 import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
@@ -719,4 +722,35 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
     assertEquals(Status.ACCEPTED, results.get(RootTable.EXTENT).getStatus());
     assertEquals(7L, 
context.getAmple().readTablet(RootTable.EXTENT).getCompactId().getAsLong());
   }
+
+  @Test
+  public void testTime() {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      var context = cluster.getServerContext();
+
+      for (var time : List.of(new MetadataTime(100, TimeType.LOGICAL),
+          new MetadataTime(100, TimeType.MILLIS), new MetadataTime(0, 
TimeType.LOGICAL))) {
+        var ctmi = new ConditionalTabletsMutatorImpl(context);
+        var tabletMeta1 = TabletMetadata.builder(e1).putTime(time).build();
+        
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
+            .putTime(new MetadataTime(101, 
TimeType.LOGICAL)).submit(tabletMetadata -> false);
+        var results = ctmi.process();
+        assertEquals(Status.REJECTED, results.get(e1).getStatus());
+        assertEquals(new MetadataTime(0, TimeType.MILLIS),
+            context.getAmple().readTablet(e1).getTime());
+      }
+
+      for (int i = 0; i < 10; i++) {
+        var ctmi = new ConditionalTabletsMutatorImpl(context);
+        var tabletMeta1 =
+            TabletMetadata.builder(e1).putTime(new MetadataTime(i, 
TimeType.MILLIS)).build();
+        
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
+            .putTime(new MetadataTime(i + 1, 
TimeType.MILLIS)).submit(tabletMetadata -> false);
+        var results = ctmi.process();
+        assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
+        assertEquals(new MetadataTime(i + 1, TimeType.MILLIS),
+            context.getAmple().readTablet(e1).getTime());
+      }
+    }
+  }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index f28b21960d..3e63bed845 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -59,10 +60,12 @@ import org.apache.accumulo.core.data.LoadPlan;
 import org.apache.accumulo.core.data.LoadPlan.RangeType;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
+import org.apache.accumulo.core.metadata.schema.MetadataTime;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
@@ -82,7 +85,6 @@ import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -183,8 +185,6 @@ public class BulkNewIT extends SharedMiniClusterBase {
   }
 
   @Test
-  @Disabled("Need to implement set time functionality")
-  // ELASTICITY_TODO
   public void testSetTime() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
       tableName = "testSetTime_table1";
@@ -193,6 +193,11 @@ public class BulkNewIT extends SharedMiniClusterBase {
       newTableConf.setTimeType(TimeType.LOGICAL);
       client.tableOperations().create(tableName, newTableConf);
       testSingleTabletSingleFile(client, false, true);
+
+      var ctx = (ClientContext) client;
+      var tabletTime = ctx.getAmple()
+          .readTablet(new KeyExtent(ctx.getTableId(tableName), new 
Text("0333"), null)).getTime();
+      assertEquals(new MetadataTime(1, TimeType.LOGICAL), tabletTime);
     }
   }
 

Reply via email to