This is an automated email from the ASF dual-hosted git repository. atri pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4e595a2 Implement Real Time Mutable FST (#8016) 4e595a2 is described below commit 4e595a24267a0521420c5401ca99d892e076a3fe Author: Atri Sharma <atri.j...@gmail.com> AuthorDate: Mon Jan 31 14:36:13 2022 +0530 Implement Real Time Mutable FST (#8016) This PR implements real time mutable FST, allowing Pinot to perform real time full text searches. Mutable FST can be written to, one word at a time without needing to have the entire set available beforehand, nor does it require the input to be sorted. Mutable FST supports concurrent read and write i.e a single thread can write to the FST and other thread can read from the same. Mutable FST supports real time updates to searches, reflecting data as it is added. It does not require a flush operation. Please see the design document at: https://docs.google.com/document/d/1O2ttsplFAflkM1Q-8-7yRNrD9EgCCWYm-W63NdC7ghE/edit?usp=sharing Follow up to convert mutable FST to immutable FST and integration with FST index. --- .../org/apache/pinot/perf/BenchmarkMutableFST.java | 101 ++++++++++++ .../utils/nativefst/mutablefst/MutableArc.java | 70 ++++++++ .../utils/nativefst/mutablefst/MutableFST.java | 66 ++++++++ .../utils/nativefst/mutablefst/MutableFSTImpl.java | 176 +++++++++++++++++++++ .../utils/nativefst/mutablefst/MutableState.java | 142 +++++++++++++++++ .../mutablefst/utils/MutableFSTUtils.java | 82 ++++++++++ .../nativefst/utils/RealTimeRegexpMatcher.java | 144 +++++++++++++++++ .../mutablefst/MutableFSTConcurrentTest.java | 146 +++++++++++++++++ .../nativefst/mutablefst/MutableFSTImplTest.java | 165 +++++++++++++++++++ .../nativefst/mutablefst/MutableFSTSanityTest.java | 94 +++++++++++ 10 files changed, 1186 insertions(+) diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkMutableFST.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkMutableFST.java new file mode 100644 index 0000000..d814767 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkMutableFST.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import org.apache.lucene.util.fst.FST; +import org.apache.pinot.segment.local.utils.fst.FSTBuilder; +import org.apache.pinot.segment.local.utils.fst.RegexpMatcher; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFST; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFSTImpl; +import org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.roaringbitmap.RoaringBitmapWriter; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Warmup(iterations = 3, time = 30) +@Measurement(iterations = 5, time = 30) +@Fork(1) +@State(Scope.Benchmark) +public class BenchmarkMutableFST { + @Param({"q.[aeiou]c.*", ".*a", "b.*", ".*", ".*ated", ".*ba.*"}) + public String _regex; + private MutableFST _mutableFST; + private FST _fst; + + @Setup + public void setUp() + throws IOException { + SortedMap<String, Integer> input = new TreeMap<>(); + + _mutableFST = new MutableFSTImpl(); + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader( + Objects.requireNonNull(getClass().getClassLoader().getResourceAsStream("data/words.txt"))))) { + String currentWord; + int i = 0; + while ((currentWord = bufferedReader.readLine()) != null) { + _mutableFST.addPath(currentWord, i); + input.put(currentWord, i++); + } + } + + _fst = FSTBuilder.buildFST(input); + } + + @Benchmark + public MutableRoaringBitmap testMutableRegex() { + RoaringBitmapWriter<MutableRoaringBitmap> writer = RoaringBitmapWriter.bufferWriter().get(); + RealTimeRegexpMatcher.regexMatch(_regex, _mutableFST, writer::add); + + return writer.get(); + } + + @Benchmark + public List testLuceneRegex() + throws IOException { + return RegexpMatcher.regexMatch(_regex, _fst); + } + + public static void main(String[] args) + throws Exception { + new Runner(new OptionsBuilder().include(BenchmarkFST.class.getSimpleName()).build()).run(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableArc.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableArc.java new file mode 100644 index 0000000..88da400 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableArc.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.utils.nativefst.mutablefst; + +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.utils.MutableFSTUtils; + + +/** + * A mutable FST's arc + */ +public class MutableArc { + + private int _outputSymbol; + private MutableState _nextState; + + /** + * Arc Constructor + * + * @param nextState the arc's next state + */ + public MutableArc(int outputSymbol, MutableState nextState) { + _outputSymbol = outputSymbol; + _nextState = nextState; + } + + public int getOutputSymbol() { + return _outputSymbol; + } + + /** + * Get the next state + */ + public MutableState getNextState() { + return _nextState; + } + + @Override + public boolean equals(Object obj) { + return MutableFSTUtils.arcEquals(this, obj); + } + + @Override + public int hashCode() { + int result = 1; + result = 31 * result + (_nextState != null ? _nextState.getLabel() : 0); + return result; + } + + @Override + public String toString() { + return "(" + _nextState.toString() + ")"; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFST.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFST.java new file mode 100644 index 0000000..7418166 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFST.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.utils.nativefst.mutablefst; + + +/** + * A mutable FST represents a FST which can have arbitrary inputs added to it at + * any given point of time. Unlike a normal FST which is build once read many, mutable + * FST can be concurrently written to and read from. Mutable FST provides real time search + * i.e. search will see words as they are added without needing a flush. + * + * Unlike a normal FST, mutable FST does not require the entire input beforehand nor + * does it require the input to be sorted. Single word additions work well with + * mutable FST. + * + * The reason as to why normal FST and mutable FST have different interfaces is because + * normal i.e. immutable FST is optimized for storage and represents arcs, nodes and labels + * by offsets. Thus, all operations are done in integer offsets. + * + * Unlike a normal FST, mutable FST has all components mutable i.e. arc and node. Thus, a + * mutable FST operates on different data structures and follows a slightly different + * interface. + * + * However, functionality wise, the interfaces for immutable FST and mutable FST are + * identical with no difference. + */ +public interface MutableFST { + + /** + * The start state in the FST; there must be exactly one + * @return + */ + MutableState getStartState(); + + /** + * Set the start state + */ + void setStartState(MutableState mutableState); + + /** + * throws an exception if the FST is constructed in an invalid state + */ + void throwIfInvalid(); + + /** + * Add a path to the FST + */ + void addPath(String word, int outputSymbol); +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTImpl.java new file mode 100644 index 0000000..3d11d42 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTImpl.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.utils.nativefst.mutablefst; + +import com.google.common.base.Preconditions; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.utils.MutableFSTUtils; + + +/** + * A mutable finite state transducer implementation that allows you to build mutable via the API. + * This is not thread safe; convert to an ImmutableFst if you need to share across multiple writer + * threads. + * + * Concurrently writing and reading to/from a mutable FST is supported. + */ +public class MutableFSTImpl implements MutableFST { + private MutableState _start; + + public MutableFSTImpl() { + _start = new MutableState(true); + } + + /** + * Get the initial states + */ + @Override + public MutableState getStartState() { + return _start; + } + + /** + * Set the initial state + * + * @param start the initial state + */ + @Override + public void setStartState(MutableState start) { + Preconditions.checkState(_start != null, "Cannot override a start state"); + _start = start; + } + + public MutableState newStartState() { + return newStartState(); + } + + public MutableArc addArc(MutableState startState, int outputSymbol, MutableState endState) { + MutableArc newArc = new MutableArc(outputSymbol, endState); + startState.addArc(newArc); + endState.addIncomingState(startState); + return newArc; + } + + @Override + public void throwIfInvalid() { + Preconditions.checkNotNull(_start, "must have a start state"); + } + + @Override + public void addPath(String word, int outputSymbol) { + MutableState state = getStartState(); + + if (state == null) { + throw new IllegalStateException("Start state cannot be null"); + } + List<MutableArc> arcs = state.getArcs(); + boolean isFound = false; + for (MutableArc arc : arcs) { + if (arc.getNextState().getLabel() == word.charAt(0)) { + state = arc.getNextState(); + isFound = true; + break; + } + } + int foundPos = -1; + if (isFound) { + Pair<MutableState, Integer> pair = findPointOfDiversion(state, word); + if (pair == null) { + // Word already exists + return; + } + foundPos = pair.getRight(); + state = pair.getLeft(); + } + for (int i = foundPos + 1; i < word.length(); i++) { + MutableState nextState = new MutableState(); + nextState.setLabel(word.charAt(i)); + + int currentOutputSymbol = -1; + if (i == word.length() - 1) { + currentOutputSymbol = outputSymbol; + } + + MutableArc mutableArc = new MutableArc(currentOutputSymbol, nextState); + state.addArc(mutableArc); + state = nextState; + } + state.setIsTerminal(true); + } + + private Pair<MutableState, Integer> findPointOfDiversion(MutableState mutableState, + String word) { + Queue<Pair<MutableState, Integer>> queue = new ArrayDeque<>(); + MutableState currentState = mutableState; + int pos = 0; + queue.add(Pair.of(mutableState, 0)); + while (!queue.isEmpty()) { + Pair<MutableState, Integer> pair = queue.remove(); + currentState = pair.getLeft(); + pos = pair.getRight(); + if (pos == word.length() - 1) { + return null; + } + if (currentState.getLabel() != word.charAt(pos)) { + throw new IllegalStateException("Current state needs to be part of word path"); + } + List<MutableArc> arcs = currentState.getArcs(); + for (MutableArc arc : arcs) { + if (arc.getNextState().getLabel() == word.charAt(pos + 1)) { + queue.add(Pair.of(arc.getNextState(), pos + 1)); + } + } + } + + return Pair.of(currentState, pos); + } + + static <T> void compactNulls(ArrayList<T> list) { + list.removeIf(Objects::isNull); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Fst(start=").append(_start).append(")"); + List<MutableArc> arcs = _start.getArcs(); + for (MutableArc arc : arcs) { + sb.append(" ").append(arc.toString()).append("\n"); + } + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + return MutableFSTUtils.fstEquals(this, o); + } + + @Override + public int hashCode() { + int result = 0; + result = 31 * result + (_start != null ? _start.hashCode() : 0); + return result; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableState.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableState.java new file mode 100644 index 0000000..1d16702 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableState.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.utils.nativefst.mutablefst; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.utils.MutableFSTUtils; + + +/** + * The fst's mutable state implementation. + * + * Holds its outgoing {@link MutableArc} objects in an ArrayList allowing additions/deletions + */ +public class MutableState { + + protected char _label; + + // Is terminal + protected boolean _isTerminal; + + // Is first state + protected boolean _isStartState; + + // Outgoing arcs + private final ArrayList<MutableArc> _arcs; + + // Incoming arcs (at least states with arcs that are incoming to us) + private final Set<MutableState> _incomingStates = Sets.newIdentityHashSet(); + + /** + * Default Constructor + */ + public MutableState() { + _arcs = Lists.newArrayList(); + } + + public MutableState(boolean isStartState) { + _isStartState = isStartState; + _arcs = Lists.newArrayList(); + } + + public boolean isTerminal() { + return _isTerminal; + } + + public boolean isStartState() { + return _isStartState; + } + + public char getLabel() { + return _label; + } + + public void setLabel(char label) { + _label = label; + } + + public void setIsTerminal(boolean isTerminal) { + _isTerminal = isTerminal; + } + + /** + * Get the number of outgoing arcs + */ + public int getArcCount() { + return _arcs.size(); + } + + /** + * Get an arc based on it's index the arcs ArrayList + * + * @param index the arc's index + * @return the arc + */ + public MutableArc getArc(int index) { + return _arcs.get(index); + } + + public List<MutableArc> getArcs() { + return _arcs; + } + + @Override + public String toString() { + return "(" + _label + ")"; + } + + // adds an arc but should only be used by MutableFst + void addArc(MutableArc arc) { + _arcs.add(arc); + } + + void addIncomingState(MutableState inState) { + if (inState == this) { + return; + } + _incomingStates.add(inState); + } + + void removeIncomingState(MutableState inState) { + _incomingStates.remove(inState); + } + + public Iterable<MutableState> getIncomingStates() { + return _incomingStates; + } + + @Override + public boolean equals(Object o) { + return MutableFSTUtils.stateEquals(this, o); + } + + @Override + public int hashCode() { + int result = _label; + long temp = 0; + result = 31 * result * ((int) (temp ^ (temp >>> 32))); + result = 31 * result + (_arcs != null ? _arcs.hashCode() : 0); + return result; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/utils/MutableFSTUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/utils/MutableFSTUtils.java new file mode 100644 index 0000000..7f41db5 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/utils/MutableFSTUtils.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.utils.nativefst.mutablefst.utils; + +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableArc; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFST; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableState; +import org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher; +import org.roaringbitmap.RoaringBitmapWriter; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + +public class MutableFSTUtils { + + private MutableFSTUtils() { + + } + + public static boolean fstEquals(Object thisFstObj, + Object thatFstObj) { + if (thisFstObj == thatFstObj) { + return true; + } + if (thisFstObj instanceof MutableFST && thatFstObj instanceof MutableFST) { + MutableFST thisFST = (MutableFST) thisFstObj; + MutableFST thatFST = (MutableFST) thatFstObj; + return thisFST.getStartState().getLabel() == thatFST.getStartState().getLabel() + && thisFST.getStartState().getArcs().size() == thatFST.getStartState().getArcs().size(); + } + return false; + } + + public static boolean arcEquals(Object thisArcObj, Object thatArcObj) { + if (thisArcObj == thatArcObj) { + return true; + } + if (thisArcObj instanceof MutableArc && thatArcObj instanceof MutableArc) { + MutableArc thisArc = (MutableArc) thisArcObj; + MutableArc thatArc = (MutableArc) thatArcObj; + return thisArc.getNextState().getLabel() == thatArc.getNextState().getLabel() + && thisArc.getNextState().getArcs().size() == thatArc.getNextState().getArcs().size(); + } + return false; + } + + public static boolean stateEquals(Object thisStateObj, Object thatStateObj) { + if (thisStateObj == thatStateObj) { + return true; + } + if (thisStateObj instanceof MutableState && thatStateObj instanceof MutableState) { + MutableState thisState = (MutableState) thisStateObj; + MutableState thatState = (MutableState) thatStateObj; + return thisState.getLabel() == thatState.getLabel() && thisState.getArcs().size() == thatState.getArcs().size(); + } + return false; + } + + /** + * Return number of matches for given regex for realtime FST + */ + public static long regexQueryNrHitsForRealTimeFST(String regex, MutableFST fst) { + RoaringBitmapWriter<MutableRoaringBitmap> writer = RoaringBitmapWriter.bufferWriter().get(); + RealTimeRegexpMatcher.regexMatch(regex, fst, writer::add); + return writer.get().getCardinality(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/utils/RealTimeRegexpMatcher.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/utils/RealTimeRegexpMatcher.java new file mode 100644 index 0000000..9c59d15 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/utils/RealTimeRegexpMatcher.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.nativefst.utils; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import org.apache.pinot.segment.local.utils.nativefst.automaton.Automaton; +import org.apache.pinot.segment.local.utils.nativefst.automaton.CharacterRunAutomaton; +import org.apache.pinot.segment.local.utils.nativefst.automaton.RegExp; +import org.apache.pinot.segment.local.utils.nativefst.automaton.State; +import org.apache.pinot.segment.local.utils.nativefst.automaton.Transition; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableArc; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFST; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableState; +import org.roaringbitmap.IntConsumer; + +public class RealTimeRegexpMatcher { + private final String _regexQuery; + private final MutableFST _fst; + private final Automaton _automaton; + private final IntConsumer _dest; + + public RealTimeRegexpMatcher(String regexQuery, MutableFST fst, IntConsumer dest) { + _regexQuery = regexQuery; + _fst = fst; + _dest = dest; + _automaton = new RegExp(_regexQuery).toAutomaton(); + } + + public static void regexMatch(String regexQuery, MutableFST fst, IntConsumer dest) { + RealTimeRegexpMatcher matcher = new RealTimeRegexpMatcher(regexQuery, fst, dest); + matcher.regexMatchOnFST(); + } + + // Matches "input" string with _regexQuery Automaton. + public boolean match(String input) { + CharacterRunAutomaton characterRunAutomaton = new CharacterRunAutomaton(_automaton); + return characterRunAutomaton.run(input); + } + + /** + * This function runs matching on automaton built from regexQuery and the FST. + * FST stores key (string) to a value (Long). Both are state machines and state transition is based on + * a input character. + * + * This algorithm starts with Queue containing (Automaton Start Node, FST Start Node). + * Each step an entry is popped from the queue: + * 1) if the automaton state is accept and the FST Node is final (i.e. end node) then the value stored for that FST + * is added to the set of result. + * 2) Else next set of transitions on automaton are gathered and for each transition target node for that character + * is figured out in FST Node, resulting pair of (automaton state, fst node) are added to the queue. + * 3) This process is bound to complete since we are making progression on the FST (which is a DAG) towards final + * nodes. + */ + public void regexMatchOnFST() { + final Queue<Path> queue = new ArrayDeque(); + + if (_automaton.getNumberOfStates() == 0) { + return; + } + // Automaton start state and FST start node is added to the queue. + queue.add(new Path(_automaton.getInitialState(), _fst.getStartState(), null, new ArrayList<>())); + Set<State> acceptStates = _automaton.getAcceptStates(); + while (!queue.isEmpty()) { + final Path path = queue.remove(); + // If automaton is in accept state and the fstNode is final (i.e. end node) then add the entry to endNodes which + // contains the result set. + if (acceptStates.contains(path._state)) { + if (path._node.isTerminal()) { + //endNodes.add((long) _fst.getOutputSymbol(path._fstArc)); + _dest.accept(path._fstArc.getOutputSymbol()); + } + } + + Set<Transition> stateTransitions = path._state.getTransitionSet(); + for (Transition t : stateTransitions) { + final int min = t._min; + final int max = t._max; + if (min == max) { + MutableArc arc = getArcForLabel(path._node, t._min); + if (arc != null) { + queue.add(new Path(t._to, arc.getNextState(), arc, path._pathState)); + } + } else { + List<MutableArc> arcs = path._node.getArcs(); + for (MutableArc arc : arcs) { + char label = arc.getNextState().getLabel(); + if (label >= min && label <= max) { + queue.add(new Path(t._to, arc.getNextState(), arc, path._pathState)); + } + } + } + } + } + } + + private MutableArc getArcForLabel(MutableState mutableState, char label) { + List<MutableArc> arcs = mutableState.getArcs(); + for (MutableArc arc : arcs) { + if (arc.getNextState().getLabel() == label) { + return arc; + } + } + return null; + } + + /** + * Represents a path in the FST traversal directed by the automaton + */ + public final class Path { + public final State _state; + public final MutableState _node; + public final MutableArc _fstArc; + // Used for capturing the path taken till the point (for debugging) + public List<Character> _pathState; + + public Path(State state, MutableState node, MutableArc fstArc, List<Character> pathState) { + _state = state; + _node = node; + _fstArc = fstArc; + _pathState = pathState; + _pathState.add(node.getLabel()); + } + } +} \ No newline at end of file diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java new file mode 100644 index 0000000..dee798f --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.nativefst.mutablefst; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.lang3.tuple.Pair; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.pinot.segment.local.utils.nativefst.mutablefst.utils.MutableFSTUtils.regexQueryNrHitsForRealTimeFST; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + + +public class MutableFSTConcurrentTest { + private ExecutorService _threadPool; + + private CountDownLatch _countDownLatch; + private Set<String> _resultSet; + + @BeforeClass + private void setup() { + _threadPool = Executors.newFixedThreadPool(2); + _countDownLatch = new CountDownLatch(2); + _resultSet = new HashSet<>(); + } + + @AfterClass + private void shutDown() { + _threadPool.shutdown(); + } + + @Test + public void testConcurrentWriteAndRead() + throws InterruptedException { + MutableFST mutableFST = new MutableFSTImpl(); + List<String> words = new ArrayList<>(); + + words.add("ab"); + words.add("abba"); + words.add("aba"); + words.add("bab"); + words.add("cdd"); + words.add("efg"); + + List<Pair<String, Integer>> wordsWithMetadata = new ArrayList<>(); + int i = 1; + + for (String currentWord : words) { + wordsWithMetadata.add(Pair.of(currentWord, i)); + i++; + } + + _threadPool.submit(() -> { + try { + performReads(mutableFST, words, 10, 200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + _threadPool.submit(() -> { + try { + performWrites(mutableFST, wordsWithMetadata, 10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + _countDownLatch.await(); + + assertEquals(_resultSet.size(), words.size()); + + assertTrue("ab not found in result set", _resultSet.contains("ab")); + assertTrue("abba not found in result set", _resultSet.contains("abba")); + assertTrue("aba not found in result set", _resultSet.contains("aba")); + assertTrue("bab not found in result set", _resultSet.contains("bab")); + assertTrue("cdd not found in result set", _resultSet.contains("cdd")); + assertTrue("efg not found in result set", _resultSet.contains("efg")); + } + + private void performReads(MutableFST fst, List<String> words, int count, + long sleepTime) + throws InterruptedException { + + for (int i = 0; i < count; i++) { + if (_resultSet.size() == words.size()) { + break; + } + + for (int j = 0; j < words.size(); j++) { + String currentWord = words.get(j); + + if (_resultSet.contains(currentWord)) { + continue; + } + + if (regexQueryNrHitsForRealTimeFST(words.get(j), fst) == 1) { + _resultSet.add(currentWord); + } + } + + Thread.sleep(sleepTime); + } + + _countDownLatch.countDown(); + } + + private void performWrites(MutableFST fst, List<Pair<String, Integer>> wordsAndMetadata, + long sleepTime) + throws InterruptedException { + + for (int i = 0; i < wordsAndMetadata.size(); i++) { + Pair<String, Integer> currentPair = wordsAndMetadata.get(i); + + fst.addPath(currentPair.getLeft(), currentPair.getRight()); + + Thread.sleep(sleepTime); + } + + _countDownLatch.countDown(); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTImplTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTImplTest.java new file mode 100644 index 0000000..afc6fcd --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTImplTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.nativefst.mutablefst; + +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher; +import org.roaringbitmap.RoaringBitmapWriter; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +import static org.apache.pinot.segment.local.utils.nativefst.mutablefst.utils.MutableFSTUtils.regexQueryNrHitsForRealTimeFST; +import static org.testng.AssertJUnit.assertEquals; + + +public class MutableFSTImplTest { + private MutableFST _fst; + + @BeforeClass + public void setUp() + throws Exception { + _fst = new MutableFSTImpl(); + + String regexTestInputString = + "the quick brown fox jumps over the lazy ???" + "dog dddddd 493432 49344 [foo] 12.3 uick \\foo\\"; + String[] splitArray = regexTestInputString.split("\\s+"); + + for (String currentValue : splitArray) { + _fst.addPath(currentValue, -1); + } + } + + @Test + public void shouldCompactNulls1() { + List<Integer> listGood = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9); + List<Integer> listBad = Lists.newArrayList(null, 1, 2, null, 3, 4, null, 5, 6, null, 7, 8, 9, null); + MutableFSTImpl.compactNulls((ArrayList) listBad); + assertEquals(listGood, listBad); + } + + @Test + public void shouldCompactNulls2() { + ArrayList<Integer> listBad = (ArrayList) Lists.newArrayList(1); + MutableFSTImpl.compactNulls(listBad); + assertEquals(Lists.newArrayList(1), listBad); + } + + + @Test + public void testRegexMatcherPrefix() { + MutableFST fst = new MutableFSTImpl(); + + fst.addPath("he", 127); + fst.addPath("hp", 136); + + RoaringBitmapWriter<MutableRoaringBitmap> writer = RoaringBitmapWriter.bufferWriter().get(); + RealTimeRegexpMatcher.regexMatch("h.*", fst, writer::add); + + Assert.assertEquals(writer.get().getCardinality(), 2); + } + + @Test + public void testRegexMatcherSuffix() { + MutableFST fst = new MutableFSTImpl(); + + fst.addPath("aeh", 127); + fst.addPath("pfh", 136); + + RoaringBitmapWriter<MutableRoaringBitmap> writer = RoaringBitmapWriter.bufferWriter().get(); + RealTimeRegexpMatcher.regexMatch(".*h", fst, writer::add); + + Assert.assertEquals(writer.get().getCardinality(), 2); + } + + @Test + public void testRegexMatcherSuffix2() { + MutableFST fst = new MutableFSTImpl(); + + fst.addPath("hello-world", 12); + fst.addPath("hello-world123", 21); + fst.addPath("still", 123); + + RoaringBitmapWriter<MutableRoaringBitmap> writer = RoaringBitmapWriter.bufferWriter().get(); + RealTimeRegexpMatcher.regexMatch(".*123", fst, writer::add); + + Assert.assertEquals(writer.get().getCardinality(), 1); + + writer.reset(); + + RealTimeRegexpMatcher.regexMatch(".till", fst, writer::add); + + Assert.assertEquals(writer.get().getCardinality(), 1); + } + + @Test + public void testRegexMatcherMatchAny() { + MutableFST fst = new MutableFSTImpl(); + + fst.addPath("hello-world", 12); + fst.addPath("hello-world123", 21); + fst.addPath("still", 123); + + RoaringBitmapWriter<MutableRoaringBitmap> writer = RoaringBitmapWriter.bufferWriter().get(); + RealTimeRegexpMatcher.regexMatch("hello.*123", fst, writer::add); + + Assert.assertEquals(writer.get().getCardinality(), 1); + + writer.reset(); + RealTimeRegexpMatcher.regexMatch("hello.*", fst, writer::add); + + Assert.assertEquals(writer.get().getCardinality(), 2); + } + + @Test + public void testRegexMatcherMatchQuestionMark() { + MutableFST fst = new MutableFSTImpl(); + + fst.addPath("car", 12); + fst.addPath("cars", 21); + + RoaringBitmapWriter<MutableRoaringBitmap> writer = RoaringBitmapWriter.bufferWriter().get(); + RealTimeRegexpMatcher.regexMatch("cars?", fst, writer::add); + + Assert.assertEquals(writer.get().getCardinality(), 2); + } + + @Test + public void testRegex1() { + Assert.assertEquals(regexQueryNrHitsForRealTimeFST("q.[aeiou]c.*", _fst), 1); + } + + @Test + public void testRegex2() { + Assert.assertEquals(regexQueryNrHitsForRealTimeFST(".[aeiou]c.*", _fst), 1); + Assert.assertEquals(regexQueryNrHitsForRealTimeFST("q.[aeiou]c.", _fst), 1); + } + + @Test + public void testCharacterClasses() { + Assert.assertEquals(regexQueryNrHitsForRealTimeFST("\\d*", _fst), 1); + Assert.assertEquals(regexQueryNrHitsForRealTimeFST("\\d{6}", _fst), 1); + Assert.assertEquals(regexQueryNrHitsForRealTimeFST("[a\\d]{6}", _fst), 1); + Assert.assertEquals(regexQueryNrHitsForRealTimeFST("\\d{2,7}", _fst), 1); + Assert.assertEquals(regexQueryNrHitsForRealTimeFST("\\d{4}", _fst), 0); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTSanityTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTSanityTest.java new file mode 100644 index 0000000..cf7e782 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTSanityTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.nativefst.mutablefst; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.pinot.segment.local.utils.fst.RegexpMatcher; +import org.roaringbitmap.RoaringBitmapWriter; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class MutableFSTSanityTest { + private MutableFST _mutableFST; + private org.apache.lucene.util.fst.FST _fst; + + @BeforeClass + public void setUp() + throws Exception { + _mutableFST = new MutableFSTImpl(); + + SortedMap<String, Integer> input = new TreeMap<>(); + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader( + Objects.requireNonNull(getClass().getClassLoader().getResourceAsStream("data/words.txt"))))) { + String currentWord; + int i = 0; + while ((currentWord = bufferedReader.readLine()) != null) { + _mutableFST.addPath(currentWord, i); + input.put(currentWord, i++); + } + } + + _fst = org.apache.pinot.segment.local.utils.fst.FSTBuilder.buildFST(input); + } + + @Test + public void testRegex() + throws IOException { + for (String regex : new String[]{"q.[aeiou]c.*", "a.*", "b.*", ".*", ".*landau", "landau.*", ".*ated", ".*ed", + ".*pot.*", ".*a"}) { + testRegex(regex); + } + } + + private void testRegex(String regex) + throws IOException { + List<Long> nativeResults = regexQueryNrHitsWithResults(regex, _mutableFST); + List<Long> results = RegexpMatcher.regexMatch(regex, _fst); + nativeResults.sort(null); + results.sort(null); + assertEquals(nativeResults, results); + } + + /** + * Return all matches for given regex + */ + public static List<Long> regexQueryNrHitsWithResults(String regex, MutableFST fst) { + RoaringBitmapWriter<MutableRoaringBitmap> writer = RoaringBitmapWriter.bufferWriter().get(); + org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher.regexMatch(regex, fst, writer::add); + MutableRoaringBitmap resultBitMap = writer.get(); + List<Long> resultList = new ArrayList<>(); + + for (int dictId : resultBitMap) { + resultList.add((long) dictId); + } + + return resultList; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org