This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new edda158a54 Modified SetEncodingIterator to include Value (#4486) edda158a54 is described below commit edda158a54b1769066e8d9c2dfe97fecee419dd2 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri May 10 09:21:24 2024 -0400 Modified SetEncodingIterator to include Value (#4486) Renamed SetEqualityIterator to SetEncodingIterator. Added a mandatory iterator option to determine whether the Value should also be encoded for the equality checks that occur for the Conditional mutations. Fixes #3522 Co-authored-by: Christopher L. Shannon <cshan...@apache.org> Co-authored-by: Dom G. <domgargu...@apache.org> --- .../metadata/ConditionalTabletMutatorImpl.java | 21 +++--- ...alityIterator.java => SetEncodingIterator.java} | 75 +++++++++++++++++++--- ...ratorTest.java => SetEncodingIteratorTest.java} | 69 +++++++++++++------- .../test/functional/AmpleConditionalWriterIT.java | 18 +++++- 4 files changed, 142 insertions(+), 41 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 3e87241c6d..381b3e112e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -55,10 +55,11 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletMutatorBase; import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metadata.iterators.LocationExistsIterator; import org.apache.accumulo.server.metadata.iterators.PresentIterator; -import org.apache.accumulo.server.metadata.iterators.SetEqualityIterator; +import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator; import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator; import com.google.common.base.Preconditions; @@ -172,16 +173,18 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit case PREV_ROW: throw new IllegalStateException("PREV_ROW already set from Extent"); case LOGS: { - Condition c = SetEqualityIterator.createCondition(new HashSet<>(tabletMetadata.getLogs()), + Condition c = SetEncodingIterator.createCondition(new HashSet<>(tabletMetadata.getLogs()), logEntry -> logEntry.getColumnQualifier().toString().getBytes(UTF_8), LogColumnFamily.NAME); mutation.addCondition(c); } break; case FILES: { - // ELASTICITY_TODO compare values? - Condition c = SetEqualityIterator.createCondition(tabletMetadata.getFiles(), - stf -> stf.getMetadata().getBytes(UTF_8), DataFileColumnFamily.NAME); + Condition c = + SetEncodingIterator.createConditionWithVal(tabletMetadata.getFilesMap().entrySet(), + entry -> new Pair<>(entry.getKey().getMetadata().getBytes(UTF_8), + entry.getValue().encode()), + DataFileColumnFamily.NAME); mutation.addCondition(c); } break; @@ -199,7 +202,7 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit break; case ECOMP: { Condition c = - SetEqualityIterator.createCondition(tabletMetadata.getExternalCompactions().keySet(), + SetEncodingIterator.createCondition(tabletMetadata.getExternalCompactions().keySet(), ecid -> ecid.canonical().getBytes(UTF_8), ExternalCompactionColumnFamily.NAME); mutation.addCondition(c); } @@ -212,13 +215,13 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit } break; case LOADED: { - Condition c = SetEqualityIterator.createCondition(tabletMetadata.getLoaded().keySet(), + Condition c = SetEncodingIterator.createCondition(tabletMetadata.getLoaded().keySet(), stf -> stf.getMetadata().getBytes(UTF_8), BulkFileColumnFamily.NAME); mutation.addCondition(c); } break; case COMPACTED: { - Condition c = SetEqualityIterator.createCondition(tabletMetadata.getCompacted(), + Condition c = SetEncodingIterator.createCondition(tabletMetadata.getCompacted(), fTid -> fTid.canonical().getBytes(UTF_8), CompactedColumnFamily.NAME); mutation.addCondition(c); } @@ -241,7 +244,7 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit break; case USER_COMPACTION_REQUESTED: { Condition c = - SetEqualityIterator.createCondition(tabletMetadata.getUserCompactionsRequested(), + SetEncodingIterator.createCondition(tabletMetadata.getUserCompactionsRequested(), fTid -> fTid.canonical().getBytes(UTF_8), UserCompactionRequestedColumnFamily.NAME); mutation.addCondition(c); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEqualityIterator.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java similarity index 62% rename from server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEqualityIterator.java rename to server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java index c5314b4467..af878263d3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEqualityIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.metadata.iterators; +import static java.nio.charset.StandardCharsets.UTF_8; + import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -38,23 +40,36 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; /** - * This iterator exists to enable checking for set equality in a conditional mutation. It allows - * comparing a set in a client process to a set encoded in column qualifiers within a tablet. + * This iterator exists to enable checking for set equality in a conditional mutation. The + * createCondition methods allow the client to create conditions for specific column families in a + * tablets metadata. The conditions will check for equality based on the value in the column + * qualifier or values in the column qualifier and Value. + * + * <h2>Options</h2> + * <ul> + * <li><b>concat.value:</b> This option must be supplied. If true, then the bytes from the Value + * will be concatenated with a null byte separator.</li> + * </ul> */ -public class SetEqualityIterator implements SortedKeyValueIterator<Key,Value> { +public class SetEncodingIterator implements SortedKeyValueIterator<Key,Value> { - // ELASTICITY_TODO unit test this iterator + public static final String CONCAT_VALUE = "concat.value"; + private static final String VALUE_SEPARATOR = "\u0000"; + private static final byte[] VALUE_SEPARATOR_BYTES = VALUE_SEPARATOR.getBytes(UTF_8); + private static final int VALUE_SEPARATOR_BYTES_LENGTH = VALUE_SEPARATOR_BYTES.length; private SortedKeyValueIterator<Key,Value> source; private Key startKey = null; private Value topValue = null; + private boolean concat = false; @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) @@ -79,14 +94,21 @@ public class SetEqualityIterator implements SortedKeyValueIterator<Key,Value> { int count = 0; while (source.hasTop()) { + final byte[] bytesToWrite; byte[] ba = source.getTopKey().getColumnQualifierData().toArray(); - dos.writeInt(ba.length); - dos.write(ba, 0, ba.length); + if (concat) { + byte[] val = source.getTopValue().get(); + bytesToWrite = encodeKeyValue(ba, val); + } else { + bytesToWrite = ba; + } + dos.writeInt(bytesToWrite.length); + dos.write(bytesToWrite, 0, bytesToWrite.length); source.next(); count++; } - // The lenght is written last so that buffering can be avoided in this iterator. + // The length is written last so that buffering can be avoided in this iterator. dos.writeInt(count); topValue = new Value(baos.toByteArray()); @@ -138,7 +160,13 @@ public class SetEqualityIterator implements SortedKeyValueIterator<Key,Value> { @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + String concat = options.get(CONCAT_VALUE); + if (concat == null || !(concat.equalsIgnoreCase("true") || concat.equalsIgnoreCase("false"))) { + throw new IllegalArgumentException( + CONCAT_VALUE + " option must be supplied with a value of 'true' or 'false'"); + } this.source = source; + this.concat = Boolean.parseBoolean(concat); } @Override @@ -176,14 +204,45 @@ public class SetEqualityIterator implements SortedKeyValueIterator<Key,Value> { } } + private static byte[] encodeKeyValue(byte[] key, byte[] val) { + var bytesToWrite = new byte[key.length + VALUE_SEPARATOR_BYTES_LENGTH + val.length]; + System.arraycopy(key, 0, bytesToWrite, 0, key.length); + System.arraycopy(VALUE_SEPARATOR_BYTES, 0, bytesToWrite, key.length, + VALUE_SEPARATOR_BYTES_LENGTH); + System.arraycopy(val, 0, bytesToWrite, key.length + VALUE_SEPARATOR_BYTES_LENGTH, val.length); + return bytesToWrite; + } + private static final Text EMPTY = new Text(); + /* + * Create a condition that will check the column qualifier values of the rows in the tablets + * metadata with the matching family against a set of values produced by the encoder function. + */ public static <T> Condition createCondition(Collection<T> set, Function<T,byte[]> encoder, Text family) { Preconditions.checkArgument(set instanceof Set); IteratorSetting is = new IteratorSetting(ConditionalTabletMutatorImpl.INITIAL_ITERATOR_PRIO, - SetEqualityIterator.class); + SetEncodingIterator.class); + is.addOption(SetEncodingIterator.CONCAT_VALUE, Boolean.toString(false)); return new Condition(family, EMPTY).setValue(encode((Set<T>) set, encoder)).setIterators(is); } + /* + * Create a condition that will check the column qualifier and Value values of the rows in the + * tablets metadata with the matching family against a set of values produced by the encoder + * function. + */ + public static <T> Condition createConditionWithVal(Collection<T> set, + Function<T,Pair<byte[],byte[]>> encoder, Text family) { + Preconditions.checkArgument(set instanceof Set); + IteratorSetting is = new IteratorSetting(ConditionalTabletMutatorImpl.INITIAL_ITERATOR_PRIO, + SetEncodingIterator.class); + is.addOption(SetEncodingIterator.CONCAT_VALUE, Boolean.toString(true)); + return new Condition(family, EMPTY).setValue(encode((Set<T>) set, s -> { + Pair<byte[],byte[]> kv = encoder.apply(s); + return encodeKeyValue(kv.getFirst(), kv.getSecond()); + })).setIterators(is); + } + } diff --git a/server/base/src/test/java/org/apache/accumulo/server/SetEqualityIteratorTest.java b/server/base/src/test/java/org/apache/accumulo/server/SetEncodingIteratorTest.java similarity index 72% rename from server/base/src/test/java/org/apache/accumulo/server/SetEqualityIteratorTest.java rename to server/base/src/test/java/org/apache/accumulo/server/SetEncodingIteratorTest.java index 3d6c11f9b8..90195493cd 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/SetEqualityIteratorTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/SetEncodingIteratorTest.java @@ -21,10 +21,11 @@ package org.apache.accumulo.server; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.io.IOException; import java.util.Collections; -import java.util.Set; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -39,17 +40,20 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.server.metadata.iterators.SetEqualityIterator; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class SetEqualityIteratorTest { +public class SetEncodingIteratorTest { - private SetEqualityIterator setEqualityIterator; - private SetEqualityIterator setEqualityIteratorNoFiles; - private SetEqualityIterator setEqualityIteratorOneFile; + private TabletMetadata tmOneFile; + private TabletMetadata tmMultipleFiles; + private SetEncodingIterator setEqualityIterator; + private SetEncodingIterator setEqualityIteratorNoFiles; + private SetEncodingIterator setEqualityIteratorOneFile; private SortedMapIterator sortedMapIterator; private SortedMapIterator sortedMapIteratorNoFiles; private SortedMapIterator sortedMapIteratorOneFile; @@ -72,13 +76,13 @@ public class SetEqualityIteratorTest { // Create tablet metadata with one file StoredTabletFile singleFile = new ReferencedTabletFile(new Path("dfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert(); - TabletMetadata tmOneFile = TabletMetadata.builder(extent) - .putFile(singleFile, new DataFileValue(100, 50)).putFlushId(8).build(); + tmOneFile = TabletMetadata.builder(extent).putFile(singleFile, new DataFileValue(100, 50)) + .putFlushId(8).build(); // Create tablet metadata with multiple files - TabletMetadata tmMultipleFiles = TabletMetadata.builder(extent) - .putFile(file1, new DataFileValue(0, 0)).putFile(file2, new DataFileValue(555, 23)) - .putFile(file3, new DataFileValue(234, 13)).putFlushId(6).build(); + tmMultipleFiles = TabletMetadata.builder(extent).putFile(file1, new DataFileValue(0, 0)) + .putFile(file2, new DataFileValue(555, 23)).putFile(file3, new DataFileValue(234, 13)) + .putFlushId(6).build(); var extent2 = new KeyExtent(extent.tableId(), null, extent.endRow()); // create another tablet metadata using extent2 w/ diff files and add it to sortedMap. This @@ -107,12 +111,15 @@ public class SetEqualityIteratorTest { sortedMapIteratorOneFile = new SortedMapIterator(sortedMapOneFile); // Set the SortedMapIterator as the source for SetEqualityIterator - setEqualityIterator = new SetEqualityIterator(); - setEqualityIterator.init(sortedMapIterator, Collections.emptyMap(), null); - setEqualityIteratorNoFiles = new SetEqualityIterator(); - setEqualityIteratorNoFiles.init(sortedMapIteratorNoFiles, Collections.emptyMap(), null); - setEqualityIteratorOneFile = new SetEqualityIterator(); - setEqualityIteratorOneFile.init(sortedMapIteratorOneFile, Collections.emptyMap(), null); + setEqualityIterator = new SetEncodingIterator(); + setEqualityIterator.init(sortedMapIterator, Map.of(SetEncodingIterator.CONCAT_VALUE, "true"), + null); + setEqualityIteratorNoFiles = new SetEncodingIterator(); + setEqualityIteratorNoFiles.init(sortedMapIteratorNoFiles, + Map.of(SetEncodingIterator.CONCAT_VALUE, "false"), null); + setEqualityIteratorOneFile = new SetEncodingIterator(); + setEqualityIteratorOneFile.init(sortedMapIteratorOneFile, + Map.of(SetEncodingIterator.CONCAT_VALUE, "true"), null); } @Test @@ -129,7 +136,7 @@ public class SetEqualityIteratorTest { // Asserting the result assertEquals(new Key(tabletRow, family), setEqualityIteratorNoFiles.getTopKey()); // The iterator should produce a value that is equal to the expected value on the condition - var condition = SetEqualityIterator.createCondition(Collections.emptySet(), + var condition = SetEncodingIterator.createCondition(Collections.emptySet(), storedTabletFile -> ((StoredTabletFile) storedTabletFile).getMetadata().getBytes(UTF_8), family); assertArrayEquals(condition.getValue().toArray(), @@ -150,8 +157,10 @@ public class SetEqualityIteratorTest { // Asserting the result assertEquals(new Key(tabletRow, family), setEqualityIteratorOneFile.getTopKey()); // The iterator should produce a value that is equal to the expected value on the condition - var condition = SetEqualityIterator.createCondition(Collections.singleton(file1), - storedTabletFile -> storedTabletFile.getMetadata().getBytes(UTF_8), family); + var condition = SetEncodingIterator.createConditionWithVal(tmOneFile.getFilesMap().entrySet(), + entry -> new Pair<>(entry.getKey().getMetadata().getBytes(UTF_8), + entry.getValue().encode()), + family); assertArrayEquals(condition.getValue().toArray(), setEqualityIteratorOneFile.getTopValue().get()); } @@ -170,10 +179,26 @@ public class SetEqualityIteratorTest { // Asserting the result assertEquals(new Key(tabletRow, family), setEqualityIterator.getTopKey()); // The iterator should produce a value that is equal to the expected value on the condition - var condition = SetEqualityIterator.createCondition(Set.of(file1, file2, file3), - storedTabletFile -> storedTabletFile.getMetadata().getBytes(UTF_8), family); + var condition = + SetEncodingIterator.createConditionWithVal(tmMultipleFiles.getFilesMap().entrySet(), + entry -> new Pair<>(entry.getKey().getMetadata().getBytes(UTF_8), + entry.getValue().encode()), + family); assertArrayEquals(condition.getValue().toArray(), setEqualityIterator.getTopValue().get()); } + @Test + public void testInvalidConcatValueOption() throws IOException { + SetEncodingIterator iter = new SetEncodingIterator(); + iter.init(null, Map.of(SetEncodingIterator.CONCAT_VALUE, "true"), null); + iter.init(null, Map.of(SetEncodingIterator.CONCAT_VALUE, "false"), null); + assertThrows(IllegalArgumentException.class, () -> iter.init(null, Map.of(), null)); + assertThrows(IllegalArgumentException.class, + () -> iter.init(null, Map.of(SetEncodingIterator.CONCAT_VALUE, "yes"), null)); + assertThrows(IllegalArgumentException.class, + () -> iter.init(null, Map.of(SetEncodingIterator.CONCAT_VALUE, ""), null)); + + } + } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 4a05fad955..a077edf7be 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -319,6 +319,19 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { .putFile(stf4, new DataFileValue(0, 0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3) .submit(tm -> false); results = ctmi.process(); + // First attempt should fail because the dfvs were replaced in the test + // so the values of the files will not match + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + // Try again with the correct comapcted datafiles + var compactedDv = new DataFileValue(0, 0); + ctmi = new ConditionalTabletsMutatorImpl(context); + tm5 = TabletMetadata.builder(e1).putFile(stf1, compactedDv).putFile(stf2, compactedDv) + .putFile(stf3, compactedDv).build(); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES) + .putFile(stf4, new DataFileValue(0, 0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3) + .submit(tm -> false); + results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); assertEquals(Set.of(stf4), context.getAmple().readTablet(e1).getFiles()); @@ -332,7 +345,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { FateId fateId = FateId.from(type, UUID.randomUUID()); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED) .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5.getTabletFile(), fateId) - .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false); + .submit(tm -> false); results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); @@ -342,7 +355,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { var stf6 = StoredTabletFile .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/A0000075.rf")); ctmi = new ConditionalTabletsMutatorImpl(context); - var tm7 = TabletMetadata.builder(e1).putFile(stf4, dfv).putFile(stf5, dfv).build(); + var tm7 = + TabletMetadata.builder(e1).putFile(stf4, compactedDv).putFile(stf5, compactedDv).build(); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm7, FILES) .putFile(stf6, new DataFileValue(0, 0)).deleteFile(stf4).deleteFile(stf5) .submit(tm -> false);