This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 17b716a2af adds iterator for filtering on fate key type (#4590) 17b716a2af is described below commit 17b716a2af834d980f7db6b0a716269a9270449b Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri May 24 12:42:25 2024 -0400 adds iterator for filtering on fate key type (#4590) This new iterator plus the locality group added in #4589 should make finding fate transactions with a given key type much more efficient. --- .../org/apache/accumulo/core/fate/FateKey.java | 9 ++++ .../accumulo/core/fate/user/FateKeyFilter.java | 54 ++++++++++++++++++++++ .../accumulo/core/fate/user/UserFateStore.java | 4 +- 3 files changed, 65 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java index 2b4238d087..6c1663627c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java @@ -100,6 +100,15 @@ public class FateKey { return new FateKey(serialized); } + public static FateKeyType deserializeType(byte[] serialized) { + try (DataInputBuffer buffer = new DataInputBuffer()) { + buffer.reset(serialized, serialized.length); + return FateKeyType.valueOf(buffer.readUTF()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public static FateKey forSplit(KeyExtent extent) { return new FateKey(FateKeyType.SPLIT, extent); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateKeyFilter.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateKeyFilter.java new file mode 100644 index 0000000000..a4ae0357d0 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateKeyFilter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.user; + +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +public class FateKeyFilter extends Filter { + + private FateKey.FateKeyType type; + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); + type = FateKey.FateKeyType.valueOf(options.get("type")); + } + + @Override + public boolean accept(Key k, Value v) { + return type.equals(FateKey.deserializeType(v.get())); + } + + public static void configureScanner(ScannerBase scanner, FateKey.FateKeyType type) { + var iterSettings = new IteratorSetting(100, "keyfilter", FateKeyFilter.class); + iterSettings.addOption("type", type.toString()); + scanner.addScanIterator(iterSettings); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index c0e6623a93..0ae7ce892e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -177,9 +177,9 @@ public class UserFateStore<T> extends AbstractFateStore<T> { Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); scanner.setRange(new Range()); TxColumnFamily.TX_KEY_COLUMN.fetch(scanner); + FateKeyFilter.configureScanner(scanner, type); return scanner.stream().onClose(scanner::close) - .map(e -> FateKey.deserialize(e.getValue().get())) - .filter(fateKey -> fateKey.getType() == type); + .map(e -> FateKey.deserialize(e.getValue().get())); } catch (TableNotFoundException e) { throw new IllegalStateException(tableName + " not found!", e); }