This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 19760b6d4a partially implements setting time for bulk imports (#3807) 19760b6d4a is described below commit 19760b6d4a59b197b80b97b2be7a2d4255591db6 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Oct 4 13:25:04 2023 -0400 partially implements setting time for bulk imports (#3807) Implements setting time for unhosted tablets for bulk imports. Setting time for a bulk import will fail if the tablet is hosted. Before this commit it always failed. see #3354 --- .../core/metadata/schema/MetadataTime.java | 1 - .../metadata/ConditionalTabletMutatorImpl.java | 8 ++++ .../manager/tableOps/bulkVer2/LoadFiles.java | 43 +++++++++++++++++----- .../test/functional/AmpleConditionalWriterIT.java | 34 +++++++++++++++++ .../apache/accumulo/test/functional/BulkNewIT.java | 11 ++++-- 5 files changed, 84 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataTime.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataTime.java index c5313527ec..eb96c309b9 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataTime.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataTime.java @@ -121,5 +121,4 @@ public final class MetadataTime implements Comparable<MetadataTime> { "Cannot compare different time types: " + this + " and " + mtime); } } - } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 7cbe7b298c..55b5d9f9c6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -24,6 +24,7 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow; @@ -222,6 +223,13 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit mutation.addCondition(c); } break; + case TIME: { + Condition c = + new Condition(TIME_COLUMN.getColumnFamily(), TIME_COLUMN.getColumnQualifier()); + c = c.setValue(tabletMetadata.getTime().encode()); + mutation.addCondition(c); + } + break; default: throw new UnsupportedOperationException("Column type " + type + " is not supported."); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 6a12a050c0..60738c0475 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -21,6 +21,7 @@ package org.apache.accumulo.manager.tableOps.bulkVer2; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; import java.util.ArrayList; import java.util.Comparator; @@ -49,13 +50,12 @@ import org.apache.accumulo.core.util.PeekingIterator; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.tablets.TabletTime; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - /** * Make asynchronous load calls to each overlapping Tablet. This RepO does its work on the isReady * and will return a linear sleep value based on the largest number of Tablets on a TabletServer. @@ -101,8 +101,6 @@ class LoadFiles extends ManagerRepo { Ample.ConditionalTabletsMutator conditionalMutator; void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception { - // ELASTICITY_TODO handle setting time... handle case where tablets are hosted and unhosted - Preconditions.checkArgument(!setTime); this.bulkDir = bulkDir; this.manager = manager; this.tid = tid; @@ -115,9 +113,24 @@ class LoadFiles extends ManagerRepo { for (TabletMetadata tablet : tablets) { Map<ReferencedTabletFile,DataFileValue> filesToLoad = new HashMap<>(); + if (setTime && tablet.getLocation() != null) { + throw new IllegalStateException("Setting time on hosted tablet is not implemented"); + } + + var tabletTime = TabletTime.getInstance(tablet.getTime()); + for (final Bulk.FileInfo fileInfo : files) { - filesToLoad.put(new ReferencedTabletFile(new Path(bulkDir, fileInfo.getFileName())), - new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries())); + + DataFileValue dfv; + + if (setTime) { + dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), + tabletTime.getAndUpdateTime()); + } else { + dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()); + } + + filesToLoad.put(new ReferencedTabletFile(new Path(bulkDir, fileInfo.getFileName())), dfv); } // remove any files that were already loaded @@ -127,12 +140,24 @@ class LoadFiles extends ManagerRepo { if (!filesToLoad.isEmpty()) { // ELASTICITY_TODO lets automatically call require prev end row - var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent()) - .requireAbsentOperation().requireSame(tablet, PREV_ROW, LOADED); + var tabletMutator = + conditionalMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation(); + + if (setTime) { + tabletMutator.requireSame(tablet, PREV_ROW, LOADED, TIME, LOCATION); + } else { + tabletMutator.requireSame(tablet, PREV_ROW, LOADED); + } filesToLoad.forEach((f, v) -> { tabletMutator.putBulkFile(f, tid); tabletMutator.putFile(f, v); + + if (setTime) { + // ELASTICITY_TODO this is not the correct thing to do when the tablet is hosted and + // could be harmful + tabletMutator.putTime(tabletTime.getMetadataTime()); + } }); tabletMutator.submit(tm -> false); @@ -179,7 +204,7 @@ class LoadFiles extends ManagerRepo { Iterator<TabletMetadata> tabletIter = TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null) - .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED).build().iterator(); + .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED, TIME).build().iterator(); Loader loader = new Loader(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 396c42e50a..5eb9078192 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -27,6 +27,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -49,6 +50,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; @@ -61,6 +63,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; @@ -719,4 +722,35 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { assertEquals(Status.ACCEPTED, results.get(RootTable.EXTENT).getStatus()); assertEquals(7L, context.getAmple().readTablet(RootTable.EXTENT).getCompactId().getAsLong()); } + + @Test + public void testTime() { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var context = cluster.getServerContext(); + + for (var time : List.of(new MetadataTime(100, TimeType.LOGICAL), + new MetadataTime(100, TimeType.MILLIS), new MetadataTime(0, TimeType.LOGICAL))) { + var ctmi = new ConditionalTabletsMutatorImpl(context); + var tabletMeta1 = TabletMetadata.builder(e1).putTime(time).build(); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME) + .putTime(new MetadataTime(101, TimeType.LOGICAL)).submit(tabletMetadata -> false); + var results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(new MetadataTime(0, TimeType.MILLIS), + context.getAmple().readTablet(e1).getTime()); + } + + for (int i = 0; i < 10; i++) { + var ctmi = new ConditionalTabletsMutatorImpl(context); + var tabletMeta1 = + TabletMetadata.builder(e1).putTime(new MetadataTime(i, TimeType.MILLIS)).build(); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME) + .putTime(new MetadataTime(i + 1, TimeType.MILLIS)).submit(tabletMetadata -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(new MetadataTime(i + 1, TimeType.MILLIS), + context.getAmple().readTablet(e1).getTime()); + } + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index f28b21960d..3e63bed845 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -52,6 +52,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -59,10 +60,12 @@ import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.LoadPlan.RangeType; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.metadata.UnreferencedTabletFile; +import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; @@ -82,7 +85,6 @@ import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -183,8 +185,6 @@ public class BulkNewIT extends SharedMiniClusterBase { } @Test - @Disabled("Need to implement set time functionality") - // ELASTICITY_TODO public void testSetTime() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { tableName = "testSetTime_table1"; @@ -193,6 +193,11 @@ public class BulkNewIT extends SharedMiniClusterBase { newTableConf.setTimeType(TimeType.LOGICAL); client.tableOperations().create(tableName, newTableConf); testSingleTabletSingleFile(client, false, true); + + var ctx = (ClientContext) client; + var tabletTime = ctx.getAmple() + .readTablet(new KeyExtent(ctx.getTableId(tableName), new Text("0333"), null)).getTime(); + assertEquals(new MetadataTime(1, TimeType.LOGICAL), tabletTime); } }