This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch json-indexing in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit bdfd3f105d170f3dc02f0a25feb3b922fe3ffc6c Author: kishoreg <g.kish...@gmail.com> AuthorDate: Sun Oct 18 22:05:51 2020 -0700 Adding index creator and reader --- .../io/util/VarLengthBytesValueReaderWriter.java | 28 +- .../operator/filter/BitmapBasedFilterOperator.java | 4 +- .../core/operator/filter/FilterOperatorUtils.java | 2 +- .../operator/filter/JSONMatchFilterOperator.java | 138 +++++ .../transform/function/CastTransformFunction.java | 2 +- .../{Predicate.java => JSONMatchPredicate.java} | 57 +- .../query/request/context/predicate/Predicate.java | 3 +- .../core/segment/creator/impl/V1Constants.java | 1 + .../segment/creator/impl/inv/JSONIndexCreator.java | 656 +++++++++++++++++++++ .../creator/impl/inv/NestedObjectIndexCreator.java | 158 ----- .../segment/index/loader/IndexLoadingConfig.java | 10 + .../segment/index/loader/SegmentPreProcessor.java | 6 + .../loader/invertedindex/JSONIndexHandler.java | 205 +++++++ .../segment/index/readers/JSONIndexReader.java | 147 +++++ .../pinot/core/segment/store/ColumnIndexType.java | 3 +- .../core/segment/store/FilePerIndexDirectory.java | 5 + .../pinot/spi/config/table/IndexingConfig.java | 9 + .../java/org/apache/pinot/spi/data/FieldSpec.java | 2 +- 18 files changed, 1249 insertions(+), 187 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java index 5a6a25a..88f56f6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.core.io.util; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Arrays; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.core.segment.memory.PinotDataBuffer; @@ -63,19 +66,19 @@ public class VarLengthBytesValueReaderWriter implements ValueReader { /** * Magic bytes used to identify the dictionary files written in variable length bytes format. */ - private static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;"); + public static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;"); /** * Increment this version if there are any structural changes in the store format and * deal with backward compatibility correctly based on old versions. */ - private static final int VERSION = 1; + public static final int VERSION = 1; // Offsets of different fields in the header. Having as constants for readability. - private static final int VERSION_OFFSET = MAGIC_BYTES.length; - private static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES; - private static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES; - private static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES; + public static final int VERSION_OFFSET = MAGIC_BYTES.length; + public static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES; + public static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES; + public static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES; private final PinotDataBuffer _dataBuffer; @@ -144,6 +147,19 @@ public class VarLengthBytesValueReaderWriter implements ValueReader { return false; } + public static byte[] getHeaderBytes(int numElements) + throws IOException { + + ByteArrayOutputStream out = new ByteArrayOutputStream(HEADER_LENGTH); + DataOutputStream dos = new DataOutputStream(out); + dos.write(MAGIC_BYTES); + dos.writeInt(VERSION); + dos.writeInt(numElements); + dos.writeInt(HEADER_LENGTH); + dos.close(); + return out.toByteArray(); + } + private void writeHeader() { for (int offset = 0; offset < MAGIC_BYTES.length; offset++) { _dataBuffer.putByte(offset, MAGIC_BYTES[offset]); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java index d9c25e1..9c307a3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java @@ -38,9 +38,9 @@ public class BitmapBasedFilterOperator extends BaseFilterOperator { private final boolean _exclusive; private final int _numDocs; - BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs) { + public BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, InvertedIndexReader invertedIndexReader, int numDocs) { _predicateEvaluator = predicateEvaluator; - _invertedIndexReader = dataSource.getInvertedIndex(); + _invertedIndexReader = invertedIndexReader; _docIds = null; _exclusive = predicateEvaluator.isExclusive(); _numDocs = numDocs; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java index 3bc676e..5076ef6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java @@ -65,7 +65,7 @@ public class FilterOperatorUtils { return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs); } if (dataSource.getInvertedIndex() != null) { - return new BitmapBasedFilterOperator(predicateEvaluator, dataSource, numDocs); + return new BitmapBasedFilterOperator(predicateEvaluator, dataSource.getInvertedIndex(), numDocs); } return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java new file mode 100644 index 0000000..adb0550 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FilterContext; +import org.apache.pinot.core.query.request.context.predicate.EqPredicate; +import org.apache.pinot.core.query.request.context.predicate.InPredicate; +import org.apache.pinot.core.query.request.context.predicate.NotEqPredicate; +import org.apache.pinot.core.query.request.context.predicate.NotInPredicate; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.core.segment.creator.impl.inv.JSONIndexCreator; +import org.apache.pinot.core.segment.index.readers.JSONIndexReader; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +@SuppressWarnings("rawtypes") +public class JSONMatchFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "JSONMatchFilterOperator"; + + private final JSONIndexReader _nestedObjectIndexReader; + private final int _numDocs; + private String _column; + private final FilterContext _filterContext; + + public JSONMatchFilterOperator(String column, FilterContext filterContext, JSONIndexReader nestedObjectIndexReader, + int numDocs) { + _column = column; + _filterContext = filterContext; + _nestedObjectIndexReader = nestedObjectIndexReader; + _numDocs = numDocs; + } + + @Override + protected FilterBlock getNextBlock() { + ImmutableRoaringBitmap docIds = process(_filterContext); + System.out.println("docIds = " + docIds); + return new FilterBlock(new BitmapDocIdSet(docIds, _numDocs)); + } + + private MutableRoaringBitmap process(FilterContext filterContext) { + List<FilterContext> children = _filterContext.getChildren(); + MutableRoaringBitmap resultBitmap = null; + + switch (filterContext.getType()) { + case AND: + for (FilterContext child : children) { + if (resultBitmap == null) { + resultBitmap = process(child); + } else { + resultBitmap.and(process(child)); + } + } + break; + case OR: + for (FilterContext child : children) { + if (resultBitmap == null) { + resultBitmap = process(child); + } else { + resultBitmap.or(process(child)); + } + } + break; + case PREDICATE: + Predicate predicate = filterContext.getPredicate(); + Predicate newPredicate = null; + switch (predicate.getType()) { + + case EQ: + EqPredicate eqPredicate = (EqPredicate) predicate; + newPredicate = new EqPredicate(ExpressionContext.forIdentifier(_column), + eqPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR + eqPredicate + .getValue()); + break; + case NOT_EQ: + NotEqPredicate nEqPredicate = (NotEqPredicate) predicate; + newPredicate = new NotEqPredicate(ExpressionContext.forIdentifier(_column), + nEqPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR + + nEqPredicate.getValue()); + break; + case IN: + InPredicate inPredicate = (InPredicate) predicate; + List<String> newInValues = inPredicate.getValues().stream().map( + value -> inPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR + + value).collect(Collectors.toList()); + newPredicate = new InPredicate(ExpressionContext.forIdentifier(_column), newInValues); + break; + case NOT_IN: + NotInPredicate notInPredicate = (NotInPredicate) predicate; + List<String> newNotInValues = inPredicate.getValues().stream().map( + value -> notInPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR + + value).collect(Collectors.toList()); + newPredicate = new InPredicate(ExpressionContext.forIdentifier(_column), newNotInValues); + break; + case IS_NULL: + newPredicate = predicate; + break; + case IS_NOT_NULL: + newPredicate = predicate; + break; + case RANGE: + case REGEXP_LIKE: + case TEXT_MATCH: + throw new UnsupportedOperationException("JSON Match does not support RANGE, REGEXP or TEXTMATCH"); + } + + resultBitmap = _nestedObjectIndexReader.getMatchingDocIds(newPredicate); + break; + } + return resultBitmap; + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java index c826446..cb6227f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java @@ -30,7 +30,7 @@ import org.apache.pinot.core.operator.transform.transformer.datetime.SDFToSDFTra import org.apache.pinot.core.plan.DocIdSetPlanNode; -public class CastTransformFunction extends BaseTransformFunction { + public class CastTransformFunction extends BaseTransformFunction { public static final String FUNCTION_NAME = "cast"; private TransformFunction _transformFunction; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java similarity index 50% copy from pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java copy to pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java index 203e950..97ad787 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java @@ -18,30 +18,55 @@ */ package org.apache.pinot.core.query.request.context.predicate; +import java.util.Objects; import org.apache.pinot.core.query.request.context.ExpressionContext; /** - * The {@code Predicate} class represents the predicate in the filter. - * <p>Currently the query engine only accepts string literals as the right-hand side of the predicate, so we store the - * right-hand side of the predicate as string or list of strings. + * Predicate for JSON_MATCH. */ -public interface Predicate { - enum Type { - EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL; +public class JSONMatchPredicate implements Predicate { + private final ExpressionContext _lhs; + private final String _value; - public boolean isExclusive() { - return this == NOT_EQ || this == NOT_IN || this == IS_NOT_NULL; + public JSONMatchPredicate(ExpressionContext lhs, String value) { + _lhs = lhs; + _value = value; + } + + @Override + public Type getType() { + return Type.JSON_MATCH; + } + + @Override + public ExpressionContext getLhs() { + return _lhs; + } + + public String getValue() { + return _value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof JSONMatchPredicate)) { + return false; } + JSONMatchPredicate that = (JSONMatchPredicate) o; + return Objects.equals(_lhs, that._lhs) && Objects.equals(_value, that._value); } - /** - * Returns the type of the predicate. - */ - Type getType(); + @Override + public int hashCode() { + return Objects.hash(_lhs, _value); + } - /** - * Returns the left-hand side expression of the predicate. - */ - ExpressionContext getLhs(); + @Override + public String toString() { + return "json_match(" + _lhs + ",'" + _value + "')"; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java index 203e950..a204fc6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java @@ -28,7 +28,7 @@ import org.apache.pinot.core.query.request.context.ExpressionContext; */ public interface Predicate { enum Type { - EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL; + EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL, JSON_MATCH; public boolean isExclusive() { return this == NOT_EQ || this == NOT_IN || this == IS_NOT_NULL; @@ -44,4 +44,5 @@ public interface Predicate { * Returns the left-hand side expression of the predicate. */ ExpressionContext getLhs(); + } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java index 0498f8c..e745120 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java @@ -36,6 +36,7 @@ public class V1Constants { public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd"; public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd"; public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv"; + public static final String JSON_INDEX_FILE_EXTENSION = ".json.idx"; public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range"; public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom"; public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java new file mode 100644 index 0000000..705f690 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java @@ -0,0 +1,656 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.creator.impl.inv; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.wnameless.json.flattener.JsonFlattener; +import com.google.common.io.Files; +import com.google.common.primitives.UnsignedBytes; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.pinot.core.common.BlockDocIdIterator; +import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.filter.JSONMatchFilterOperator; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FilterContext; +import org.apache.pinot.core.query.request.context.predicate.EqPredicate; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.segment.index.readers.JSONIndexReader; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class JSONIndexCreator implements Closeable { + + //separator used to join the key and value to create posting list key + public static String POSTING_LIST_KEY_SEPARATOR = "|"; + static int FLUSH_THRESHOLD = 50_000; + static int VERSION = 1; + private final File flattenedDocId2RootDocIdMappingFile; + private final File postingListFile; + private File dictionaryheaderFile; + private File dictionaryOffsetFile; + private File dictionaryFile; + private File invertedIndexOffsetFile; + private File invertedIndexFile; + private File outputIndexFile; + + private int docId = 0; + private int numFlatennedDocId = 0; + int chunkId = 0; + + private DataOutputStream postingListWriter; + private DataOutputStream flattenedDocId2RootDocIdWriter; + + Map<String, List<Integer>> postingListMap = new TreeMap<>(); + List<Integer> flattenedDocIdList = new ArrayList<>(); + List<Integer> postingListChunkOffsets = new ArrayList<>(); + List<Integer> chunkLengths = new ArrayList<>(); + private FieldSpec fieldSpec; + + public JSONIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType) + throws IOException { + this.fieldSpec = fieldSpec; + System.out.println("indexDir = " + indexDir); + + String name = fieldSpec.getName(); + postingListFile = new File(indexDir + name + "_postingList.buf"); + postingListWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(postingListFile))); + postingListChunkOffsets.add(postingListWriter.size()); + + dictionaryheaderFile = new File(indexDir, name + "_dictionaryHeader.buf"); + dictionaryOffsetFile = new File(indexDir, name + "_dictionaryOffset.buf"); + dictionaryFile = new File(indexDir, name + "_dictionary.buf"); + invertedIndexOffsetFile = new File(indexDir, name + "_invertedIndexOffset.buf"); + invertedIndexFile = new File(indexDir, name + "_invertedIndex.buf"); + flattenedDocId2RootDocIdMappingFile = new File(indexDir, name + "_flattenedDocId.buf"); + flattenedDocId2RootDocIdWriter = + new DataOutputStream(new BufferedOutputStream(new FileOutputStream(flattenedDocId2RootDocIdMappingFile))); + + //output file + outputIndexFile = new File(indexDir, name + ".nested.idx"); + } + + public void add(byte[] data) + throws IOException { + + JsonNode jsonNode = new ObjectMapper().readTree(data); + List<Map<String, String>> flattenedMapList = unnestJson(jsonNode); + for (Map<String, String> map : flattenedMapList) { + // + for (Map.Entry<String, String> entry : map.entrySet()) { + //handle key posting list + String key = entry.getKey(); + + List<Integer> keyPostingList = postingListMap.get(key); + if (keyPostingList == null) { + keyPostingList = new ArrayList<>(); + postingListMap.put(key, keyPostingList); + } + keyPostingList.add(numFlatennedDocId); + + //handle keyvalue posting list + String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue(); + List<Integer> keyValuePostingList = postingListMap.get(keyValue); + if (keyValuePostingList == null) { + keyValuePostingList = new ArrayList<>(); + postingListMap.put(keyValue, keyValuePostingList); + } + keyValuePostingList.add(numFlatennedDocId); + } + //flattenedDocId2RootDocIdMapping + flattenedDocIdList.add(numFlatennedDocId); + + numFlatennedDocId++; + } + docId++; + + //flush data + if (docId % FLUSH_THRESHOLD == 0) { + flush(); + } + } + + /** + * Multi value + * @param dataArray + * @param length + * @throws IOException + */ + public void add(byte[][] dataArray, int length) + throws IOException { + + for (int i = 0; i < length; i++) { + byte[] data = dataArray[i]; + JsonNode jsonNode = new ObjectMapper().readTree(data); + List<Map<String, String>> flattenedMapList = unnestJson(jsonNode); + for (Map<String, String> map : flattenedMapList) { + // + for (Map.Entry<String, String> entry : map.entrySet()) { + //handle key posting list + String key = entry.getKey(); + + List<Integer> keyPostingList = postingListMap.get(key); + if (keyPostingList == null) { + keyPostingList = new ArrayList<>(); + postingListMap.put(key, keyPostingList); + } + keyPostingList.add(numFlatennedDocId); + + //handle keyvalue posting list + String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue(); + List<Integer> keyValuePostingList = postingListMap.get(keyValue); + if (keyValuePostingList == null) { + keyValuePostingList = new ArrayList<>(); + postingListMap.put(keyValue, keyValuePostingList); + } + keyValuePostingList.add(numFlatennedDocId); + } + //flattenedDocId2RootDocIdMapping + flattenedDocIdList.add(numFlatennedDocId); + + numFlatennedDocId++; + } + } + docId++; + + //flush data + if (docId % FLUSH_THRESHOLD == 0) { + flush(); + } + } + + public void seal() + throws IOException { + + flush(); + + flattenedDocId2RootDocIdWriter.close(); + postingListWriter.close(); + + //key posting list merging + System.out.println("InvertedIndex"); + System.out.println("================="); + + int maxKeyLength = createInvertedIndex(postingListFile, postingListChunkOffsets, chunkLengths); + System.out.println("================="); + + int flattenedDocid = 0; + DataInputStream flattenedDocId2RootDocIdReader = + new DataInputStream(new BufferedInputStream(new FileInputStream(flattenedDocId2RootDocIdMappingFile))); + int[] rootDocIdArray = new int[numFlatennedDocId]; + while (flattenedDocid < numFlatennedDocId) { + rootDocIdArray[flattenedDocid++] = flattenedDocId2RootDocIdReader.readInt(); + } + System.out.println("FlattenedDocId to RootDocId Mapping = "); + System.out.println(Arrays.toString(rootDocIdArray)); + + //PUT all contents into one file + + //header + // version + maxDictionaryLength + [store the offsets + length for each one (dictionary offset file, dictionaryFile, index offset file, index file, flattened docId to rootDocId file)] + long headerSize = 2 * Integer.BYTES + 6 * 2 * Long.BYTES; + + long dataSize = + dictionaryheaderFile.length() + dictionaryOffsetFile.length() + dictionaryFile.length() + invertedIndexFile + .length() + invertedIndexOffsetFile.length() + flattenedDocId2RootDocIdMappingFile.length(); + + long totalSize = headerSize + dataSize; + PinotDataBuffer pinotDataBuffer = + PinotDataBuffer.mapFile(outputIndexFile, false, 0, totalSize, ByteOrder.BIG_ENDIAN, "Nested inverted index"); + + pinotDataBuffer.putInt(0, VERSION); + pinotDataBuffer.putInt(1 * Integer.BYTES, maxKeyLength); + long writtenBytes = headerSize; + + //add dictionary header + int bufferId = 0; + pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes); + pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryheaderFile.length()); + pinotDataBuffer.readFrom(writtenBytes, dictionaryheaderFile, 0, dictionaryheaderFile.length()); + writtenBytes += dictionaryheaderFile.length(); + + //add dictionary offset + bufferId = bufferId + 1; + pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes); + pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryOffsetFile.length()); + pinotDataBuffer.readFrom(writtenBytes, dictionaryOffsetFile, 0, dictionaryOffsetFile.length()); + writtenBytes += dictionaryOffsetFile.length(); + + //add dictionary + bufferId = bufferId + 1; + pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes); + pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryFile.length()); + pinotDataBuffer.readFrom(writtenBytes, dictionaryFile, 0, dictionaryFile.length()); + writtenBytes += dictionaryFile.length(); + + //add index offset + bufferId = bufferId + 1; + pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes); + pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, invertedIndexOffsetFile.length()); + pinotDataBuffer.readFrom(writtenBytes, invertedIndexOffsetFile, 0, invertedIndexOffsetFile.length()); + writtenBytes += invertedIndexOffsetFile.length(); + + //add index data + bufferId = bufferId + 1; + pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes); + pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, invertedIndexFile.length()); + pinotDataBuffer.readFrom(writtenBytes, invertedIndexFile, 0, invertedIndexFile.length()); + writtenBytes += invertedIndexFile.length(); + + //add flattened docid to root doc id mapping + bufferId = bufferId + 1; + pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes); + pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, flattenedDocId2RootDocIdMappingFile.length()); + pinotDataBuffer + .readFrom(writtenBytes, flattenedDocId2RootDocIdMappingFile, 0, flattenedDocId2RootDocIdMappingFile.length()); + writtenBytes += flattenedDocId2RootDocIdMappingFile.length(); + } + + private long getBufferStartOffset(int bufferId) { + return 2 * Integer.BYTES + 2 * bufferId * Long.BYTES; + } + + private int createInvertedIndex(File postingListFile, List<Integer> postingListChunkOffsets, + List<Integer> chunkLengthList) + throws IOException { + + List<Iterator<ImmutablePair<byte[], int[]>>> chunkIterators = new ArrayList<>(); + + for (int i = 0; i < chunkLengthList.size(); i++) { + + final DataInputStream postingListFileReader = + new DataInputStream(new BufferedInputStream(new FileInputStream(postingListFile))); + postingListFileReader.skipBytes(postingListChunkOffsets.get(i)); + final int length = chunkLengthList.get(i); + chunkIterators.add(new Iterator<ImmutablePair<byte[], int[]>>() { + int index = 0; + + @Override + public boolean hasNext() { + return index < length; + } + + @Override + public ImmutablePair<byte[], int[]> next() { + try { + int keyLength = postingListFileReader.readInt(); + byte[] keyBytes = new byte[keyLength]; + postingListFileReader.read(keyBytes); + + int postingListLength = postingListFileReader.readInt(); + int[] postingList = new int[postingListLength]; + for (int i = 0; i < postingListLength; i++) { + postingList[i] = postingListFileReader.readInt(); + } + index++; + return ImmutablePair.of(keyBytes, postingList); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + final Comparator<byte[]> byteArrayComparator = UnsignedBytes.lexicographicalComparator(); + + PriorityQueue<ImmutablePair<Integer, ImmutablePair<byte[], int[]>>> queue = + new PriorityQueue<>(chunkLengthList.size(), + (o1, o2) -> byteArrayComparator.compare(o1.getRight().getLeft(), o2.getRight().getLeft())); + for (int i = 0; i < chunkIterators.size(); i++) { + Iterator<ImmutablePair<byte[], int[]>> iterator = chunkIterators.get(i); + if (iterator.hasNext()) { + queue.offer(ImmutablePair.of(i, iterator.next())); + } + } + byte[] prevKey = null; + RoaringBitmap roaringBitmap = new RoaringBitmap(); + + Writer writer = new Writer(dictionaryheaderFile, dictionaryOffsetFile, dictionaryFile, invertedIndexOffsetFile, + invertedIndexFile); + while (!queue.isEmpty()) { + ImmutablePair<Integer, ImmutablePair<byte[], int[]>> poll = queue.poll(); + byte[] currKey = poll.getRight().getLeft(); + if (prevKey != null && byteArrayComparator.compare(prevKey, currKey) != 0) { + System.out.println(new String(prevKey) + ":" + roaringBitmap); + writer.add(prevKey, roaringBitmap); + roaringBitmap.clear(); + } + + roaringBitmap.add(poll.getRight().getRight()); + prevKey = currKey; + + //add the next key from the chunk where the currKey was removed from + Iterator<ImmutablePair<byte[], int[]>> iterator = chunkIterators.get(poll.getLeft()); + if (iterator.hasNext()) { + queue.offer(ImmutablePair.of(poll.getLeft(), iterator.next())); + } + } + + if (prevKey != null) { + writer.add(prevKey, roaringBitmap); + } + writer.finish(); + return writer.getMaxDictionaryValueLength(); + } + + private void flush() + throws IOException { + //write the key (length|actual bytes) - posting list(length, flattenedDocIds) + System.out.println("postingListMap = " + postingListMap); + for (Map.Entry<String, List<Integer>> entry : postingListMap.entrySet()) { + byte[] keyBytes = entry.getKey().getBytes(Charset.forName("UTF-8")); + postingListWriter.writeInt(keyBytes.length); + postingListWriter.write(keyBytes); + List<Integer> flattenedDocIdList = entry.getValue(); + postingListWriter.writeInt(flattenedDocIdList.size()); + for (int flattenedDocId : flattenedDocIdList) { + postingListWriter.writeInt(flattenedDocId); + } + } + + //write flattened doc id to root docId mapping + for (int rootDocId : flattenedDocIdList) { + flattenedDocId2RootDocIdWriter.writeInt(rootDocId); + } + chunkLengths.add(postingListMap.size()); + postingListChunkOffsets.add(postingListWriter.size()); + postingListMap.clear(); + flattenedDocIdList.clear(); + } + + private static List<Map<String, String>> unnestJson(JsonNode root) { + Iterator<Map.Entry<String, JsonNode>> fields = root.fields(); + Map<String, String> flattenedSingleValuesMap = new TreeMap<>(); + Map<String, JsonNode> arrNodes = new TreeMap<>(); + Map<String, JsonNode> objectNodes = new TreeMap<>(); + List<Map<String, String>> resultList = new ArrayList<>(); + List<Map<String, String>> tempResultList = new ArrayList<>(); + while (fields.hasNext()) { + Map.Entry<String, JsonNode> child = fields.next(); + if (child.getValue().isValueNode()) { + //Normal value node + flattenedSingleValuesMap.put(child.getKey(), child.getValue().asText()); + } else if (child.getValue().isArray()) { + //Array Node: Process these nodes later + arrNodes.put(child.getKey(), child.getValue()); + } else { + //Object Node + objectNodes.put(child.getKey(), child.getValue()); + } + } + for (String objectNodeKey : objectNodes.keySet()) { + JsonNode objectNode = objectNodes.get(objectNodeKey); + modifyKeysInMap(flattenedSingleValuesMap, tempResultList, objectNodeKey, objectNode); + } + if (tempResultList.isEmpty()) { + tempResultList.add(flattenedSingleValuesMap); + } + if (!arrNodes.isEmpty()) { + for (Map<String, String> flattenedMapElement : tempResultList) { + for (String arrNodeKey : arrNodes.keySet()) { + JsonNode arrNode = arrNodes.get(arrNodeKey); + for (JsonNode arrNodeElement : arrNode) { + modifyKeysInMap(flattenedMapElement, resultList, arrNodeKey, arrNodeElement); + } + } + } + } else { + resultList.addAll(tempResultList); + } + return resultList; + } + + private static void modifyKeysInMap(Map<String, String> flattenedMap, List<Map<String, String>> resultList, + String arrNodeKey, JsonNode arrNode) { + List<Map<String, String>> objectResult = unnestJson(arrNode); + for (Map<String, String> flattenedObject : objectResult) { + Map<String, String> flattenedObjectCopy = new TreeMap<>(flattenedMap); + for (Map.Entry<String, String> entry : flattenedObject.entrySet()) { + flattenedObjectCopy.put(arrNodeKey + "." + entry.getKey(), entry.getValue()); + } + resultList.add(flattenedObjectCopy); + } + } + + @Override + public void close() + throws IOException { + + } + + private class Writer { + private DataOutputStream _dictionaryHeaderWriter; + private DataOutputStream _dictionaryOffsetWriter; + private File _dictionaryOffsetFile; + private DataOutputStream _dictionaryWriter; + private DataOutputStream _invertedIndexOffsetWriter; + private File _invertedIndexOffsetFile; + private DataOutputStream _invertedIndexWriter; + private int _dictId; + private int _dictOffset; + private int _invertedIndexOffset; + int _maxDictionaryValueLength = Integer.MIN_VALUE; + + public Writer(File dictionaryheaderFile, File dictionaryOffsetFile, File dictionaryFile, + File invertedIndexOffsetFile, File invertedIndexFile) + throws IOException { + _dictionaryHeaderWriter = + new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryheaderFile))); + + _dictionaryOffsetWriter = + new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryOffsetFile))); + _dictionaryOffsetFile = dictionaryOffsetFile; + _dictionaryWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryFile))); + _invertedIndexOffsetWriter = + new DataOutputStream(new BufferedOutputStream(new FileOutputStream(invertedIndexOffsetFile))); + _invertedIndexOffsetFile = invertedIndexOffsetFile; + _invertedIndexWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(invertedIndexFile))); + _dictId = 0; + _dictOffset = 0; + _invertedIndexOffset = 0; + } + + public void add(byte[] key, RoaringBitmap roaringBitmap) + throws IOException { + if (key.length > _maxDictionaryValueLength) { + _maxDictionaryValueLength = key.length; + } + //write the key to dictionary + _dictionaryOffsetWriter.writeInt(_dictOffset); + _dictionaryWriter.write(key); + + //write the roaringBitmap to inverted index + _invertedIndexOffsetWriter.writeInt(_invertedIndexOffset); + + int serializedSizeInBytes = roaringBitmap.serializedSizeInBytes(); + byte[] serializedRoaringBitmap = new byte[serializedSizeInBytes]; + ByteBuffer serializedRoaringBitmapBuffer = ByteBuffer.wrap(serializedRoaringBitmap); + roaringBitmap.serialize(serializedRoaringBitmapBuffer); + _invertedIndexWriter.write(serializedRoaringBitmap); + System.out.println( + "dictId = " + _dictId + ", dict offset:" + _dictOffset + ", valueLength:" + key.length + ", inv offset:" + + _invertedIndexOffset + ", serializedSizeInBytes:" + serializedSizeInBytes); + + //increment offsets + _dictOffset = _dictOffset + key.length; + _invertedIndexOffset = _invertedIndexOffset + serializedSizeInBytes; + //increment the dictionary id + _dictId = _dictId + 1; + } + + void finish() + throws IOException { + //InvertedIndexReader and VarlengthBytesValueReaderWriter needs one extra entry for offsets since it computes the length for index i using offset[i+1] - offset[i] + _invertedIndexOffsetWriter.writeInt(_invertedIndexOffset); + _dictionaryOffsetWriter.writeInt(_dictOffset); + + byte[] headerBytes = VarLengthBytesValueReaderWriter.getHeaderBytes(_dictId); + _dictionaryHeaderWriter.write(headerBytes); + System.out.println("headerBytes = " + Arrays.toString(headerBytes)); + + _dictionaryHeaderWriter.close(); + _dictionaryOffsetWriter.close(); + _dictionaryWriter.close(); + _invertedIndexOffsetWriter.close(); + _invertedIndexWriter.close(); + + //data offsets started with zero but the actual dictionary and index will contain (header + offsets + data). so all the offsets must be adjusted ( i.e add size(header) + size(offset) to each offset value) + PinotDataBuffer dictionaryOffsetBuffer = PinotDataBuffer + .mapFile(dictionaryOffsetFile, false, 0, _dictionaryOffsetFile.length(), ByteOrder.BIG_ENDIAN, + "dictionary offset file"); + int dictOffsetBase = _dictionaryHeaderWriter.size() + _dictionaryOffsetWriter.size(); + for (int i = 0; i < _dictId + 1; i++) { + int offset = dictionaryOffsetBuffer.getInt(i * Integer.BYTES); + int newOffset = offset + dictOffsetBase; + dictionaryOffsetBuffer.putInt(i * Integer.BYTES, offset + dictOffsetBase); + System.out.println("dictId = " + i + ", offset = " + offset + ", newOffset = " + newOffset); + } + + PinotDataBuffer invIndexOffsetBuffer = PinotDataBuffer + .mapFile(invertedIndexOffsetFile, false, 0, invertedIndexOffsetFile.length(), ByteOrder.BIG_ENDIAN, + "invertedIndexOffsetFile"); + int invIndexOffsetBase = _invertedIndexOffsetWriter.size(); + for (int i = 0; i < _dictId + 1; i++) { + int offset = invIndexOffsetBuffer.getInt(i * Integer.BYTES); + int newOffset = offset + invIndexOffsetBase; + System.out.println("offset = " + offset + ", newOffset = " + newOffset); + + invIndexOffsetBuffer.putInt(i * Integer.BYTES, newOffset); + } + + invIndexOffsetBuffer.close(); + dictionaryOffsetBuffer.close(); + } + + public int getMaxDictionaryValueLength() { + return _maxDictionaryValueLength; + } + } + + public static void main(String[] args) + throws Exception { + + String json0 = " { \"a\" : { \"b\" : 1, \"c\": 3, \"d\": [{\"x\" : 1}, {\"y\" : 1}] }, \"e\": \"f\", \"g\":2.3 }"; + String json1 = + " { \"name\" : \"adam\", \"age\": 30, \"country\": \"us\", \"address\": {\"number\" : 112, \"street\": \"main st\", \"country\": \"us\" } }"; + String json2 = " { \"name\" : \"adam\", \"age\": 30 }"; + String json3 = "{\n" + " \"name\" : \"adam\",\n" + " \"age\" : 30,\n" + " \"country\" : \"us\",\n" + + " \"addresses\" : [{\n" + " \"number\" : 1,\n" + " \"street\" : \"main st\",\n" + + " \"country\" : \"us\"\n" + " }, {\n" + " \"number\" : 2,\n" + " \"street\" : \"second st\",\n" + + " \"country\" : \"us\"\n" + " }, {\n" + " \"number\" : 3,\n" + " \"street\" : \"third st\",\n" + + " \"country\" : \"us\"\n" + " }]\n" + "}\n"; + + String json4 = + "{\n" + " \"year\": [\n" + " 2018\n" + " ],\n" + " \"customers\": [\n" + " {\n" + + " \"name\": \"John\",\n" + " \"contact\": [\n" + " {\n" + + " \"phone\": \"home\",\n" + " \"number\": \"333-3334\"\n" + + " }\n" + " ]\n" + " },\n" + " {\n" + + " \"name\": \"Jane\",\n" + " \"contact\": [\n" + " {\n" + + " \"phone\": \"home\",\n" + " \"number\": \"555-5556\"\n" + + " }\n" + " ],\n" + " \"surname\": \"Shaw\"\n" + " }\n" + + " ]\n" + "}"; + + String json5 = "{ \n" + " \"accounting\" : [ \n" + " { \"firstName\" : \"John\", \n" + + " \"lastName\" : \"Doe\",\n" + " \"age\" : 23 },\n" + "\n" + + " { \"firstName\" : \"Mary\", \n" + " \"lastName\" : \"Smith\",\n" + + " \"age\" : 32 }\n" + " ], \n" + + " \"sales\" : [ \n" + " { \"firstName\" : \"Sally\", \n" + + " \"lastName\" : \"Green\",\n" + " \"age\" : 27 },\n" + + "\n" + " { \"firstName\" : \"Jim\", \n" + + " \"lastName\" : \"Galley\",\n" + " \"age\" : 41 }\n" + + " ] \n" + "} "; + + String json = json3; + System.out.println("json = " + json); + JsonNode rawJsonNode = new ObjectMapper().readTree(json); + + System.out.println( + "rawJsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(rawJsonNode)); + String flattenJson = JsonFlattener.flatten(json); + + System.out.println("flattenJson = " + flattenJson); + JsonNode jsonNode = new ObjectMapper().readTree(flattenJson); + + System.out + .println("jsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode)); + Map<String, Object> stringObjectMap = JsonFlattener.flattenAsMap(json); + System.out.println("JsonFlattener.flattenAsMap(json) = " + stringObjectMap); + FieldSpec fieldSpec = new DimensionFieldSpec(); + fieldSpec.setName("person"); + File tempDir = Files.createTempDir(); + JSONIndexCreator creator = new JSONIndexCreator(tempDir, fieldSpec, FieldSpec.DataType.BYTES); + List<Map<String, String>> maps = creator.unnestJson(rawJsonNode); + System.out.println("maps = " + maps.toString().replaceAll("},", "}\n")); + creator.add(json.getBytes()); + + creator.seal(); + System.out.println("Output Dir = " + tempDir); + System.out.println("FileUtils.listFiles(tempDir, null, true) = " + FileUtils.listFiles(tempDir, null, true).stream() + .map(file -> file.getName()).collect(Collectors.toList())); + + //Test reader + PinotDataBuffer buffer = + PinotDataBuffer.mapReadOnlyBigEndianFile(new File(tempDir, fieldSpec.getName() + ".nested.idx")); + JSONIndexReader reader = new JSONIndexReader(buffer); + ExpressionContext lhs = ExpressionContext.forIdentifier("person"); + Predicate predicate = new EqPredicate(lhs, "addresses.street" + POSTING_LIST_KEY_SEPARATOR + "third st"); + MutableRoaringBitmap matchingDocIds = reader.getMatchingDocIds(predicate); + System.out.println("matchingDocIds = " + matchingDocIds); + + //Test filter operator + FilterContext filterContext = QueryContextConverterUtils + .getFilter(CalciteSqlParser.compileToExpression("name='adam' AND addresses.street='main st'")); + int numDocs = 1; + JSONMatchFilterOperator operator = new JSONMatchFilterOperator("person", filterContext, reader, numDocs); + FilterBlock filterBlock = operator.nextBlock(); + BlockDocIdIterator iterator = filterBlock.getBlockDocIdSet().iterator(); + int docId = -1; + while ((docId = iterator.next()) > 0) { + System.out.println("docId = " + docId); + } + } +} \ No newline at end of file diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java deleted file mode 100644 index adfa19d..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.segment.creator.impl.inv; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.wnameless.json.flattener.JsonFlattener; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import org.apache.pinot.spi.data.FieldSpec; - - -public class NestedObjectIndexCreator { - - public NestedObjectIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType) { - - } - - private static List<Map<String, String>> unnestJson(JsonNode root) { - Iterator<Map.Entry<String, JsonNode>> fields = root.fields(); - Map<String, String> flattenedSingleValuesMap = new TreeMap<>(); - Map<String, JsonNode> arrNodes = new TreeMap<>(); - Map<String, JsonNode> objectNodes = new TreeMap<>(); - List<Map<String, String>> resultList = new ArrayList<>(); - List<Map<String, String>> tempResultList = new ArrayList<>(); - while (fields.hasNext()) { - Map.Entry<String, JsonNode> child = fields.next(); - if (child.getValue().isValueNode()) { - //Normal value node - flattenedSingleValuesMap.put(child.getKey(), child.getValue().asText()); - } else if (child.getValue().isArray()) { - //Array Node: Process these nodes later - arrNodes.put(child.getKey(), child.getValue()); - } else { - //Object Node - objectNodes.put(child.getKey(), child.getValue()); - } - } - for (String objectNodeKey : objectNodes.keySet()) { - JsonNode objectNode = objectNodes.get(objectNodeKey); - modifyKeysInMap(flattenedSingleValuesMap, tempResultList, objectNodeKey, objectNode); - } - if (tempResultList.isEmpty()) { - tempResultList.add(flattenedSingleValuesMap); - } - if (!arrNodes.isEmpty()) { - for (Map<String, String> flattenedMapElement : tempResultList) { - for (String arrNodeKey : arrNodes.keySet()) { - JsonNode arrNode = arrNodes.get(arrNodeKey); - for (JsonNode arrNodeElement : arrNode) { - modifyKeysInMap(flattenedMapElement, resultList, arrNodeKey, arrNodeElement); - } - } - } - } else { - resultList.addAll(tempResultList); - } - return resultList; - } - - private static void modifyKeysInMap(Map<String, String> flattenedMap, List<Map<String, String>> resultList, - String arrNodeKey, JsonNode arrNode) { - List<Map<String, String>> objectResult = unnestJson(arrNode); - for (Map<String, String> flattenedObject : objectResult) { - Map<String, String> flattenedObjectCopy = new TreeMap<>(flattenedMap); - for (Map.Entry<String, String> entry : flattenedObject.entrySet()) { - flattenedObjectCopy.put(arrNodeKey + "." + entry.getKey(), entry.getValue()); - } - resultList.add(flattenedObjectCopy); - } - } - - public void add(byte[] data) - throws IOException { - - JsonNode jsonNode = new ObjectMapper().readTree(data); - List<Map<String, String>> flattenedMapList = unnestJson(jsonNode); - for (Map<String, String> map : flattenedMapList) { - - } - } - - public static void main(String[] args) - throws IOException { - - String json0 = " { \"a\" : { \"b\" : 1, \"c\": 3, \"d\": [{\"x\" : 1}, {\"y\" : 1}] }, \"e\": \"f\", \"g\":2.3 }"; - String json1 = - " { \"name\" : \"adam\", \"age\": 30, \"country\": \"us\", \"address\": {\"number\" : 112, \"street\": \"main st\", \"country\": \"us\" } }"; - String json2 = " { \"name\" : \"adam\", \"age\": 30 }"; - String json3 = "{\n" + " \"name\" : \"adam\",\n" + " \"age\" : 30,\n" + " \"country\" : \"us\",\n" - + " \"addresses\" : [{\n" + " \"number\" : 1,\n" + " \"street\" : \"main st\",\n" - + " \"country\" : \"us\"\n" + " }, {\n" + " \"number\" : 2,\n" + " \"street\" : \"second st\",\n" - + " \"country\" : \"us\"\n" + " }, {\n" + " \"number\" : 3,\n" + " \"street\" : \"third st\",\n" - + " \"country\" : \"us\"\n" + " }]\n" + "}\n"; - - String json4 = - "{\n" + " \"year\": [\n" + " 2018\n" + " ],\n" + " \"customers\": [\n" + " {\n" - + " \"name\": \"John\",\n" + " \"contact\": [\n" + " {\n" - + " \"phone\": \"home\",\n" + " \"number\": \"333-3334\"\n" - + " }\n" + " ]\n" + " },\n" + " {\n" - + " \"name\": \"Jane\",\n" + " \"contact\": [\n" + " {\n" - + " \"phone\": \"home\",\n" + " \"number\": \"555-5556\"\n" - + " }\n" + " ],\n" + " \"surname\": \"Shaw\"\n" + " }\n" - + " ]\n" + "}"; - - String json5 = "{ \n" + " \"accounting\" : [ \n" + " { \"firstName\" : \"John\", \n" - + " \"lastName\" : \"Doe\",\n" + " \"age\" : 23 },\n" + "\n" - + " { \"firstName\" : \"Mary\", \n" + " \"lastName\" : \"Smith\",\n" - + " \"age\" : 32 }\n" + " ], \n" - + " \"sales\" : [ \n" + " { \"firstName\" : \"Sally\", \n" - + " \"lastName\" : \"Green\",\n" + " \"age\" : 27 },\n" - + "\n" + " { \"firstName\" : \"Jim\", \n" - + " \"lastName\" : \"Galley\",\n" + " \"age\" : 41 }\n" - + " ] \n" + "} "; - - String json = json3; - System.out.println("json = " + json); - JsonNode rawJsonNode = new ObjectMapper().readTree(json); - - System.out.println( - "rawJsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(rawJsonNode)); - String flattenJson = JsonFlattener.flatten(json); - - System.out.println("flattenJson = " + flattenJson); - JsonNode jsonNode = new ObjectMapper().readTree(flattenJson); - - System.out - .println("jsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode)); - Map<String, Object> stringObjectMap = JsonFlattener.flattenAsMap(json); - System.out.println("JsonFlattener.flattenAsMap(json) = " + stringObjectMap); - NestedObjectIndexCreator creator = new NestedObjectIndexCreator(null, null, null); - List<Map<String, String>> maps = creator.unnestJson(rawJsonNode); - System.out.println("maps = " + maps.toString().replaceAll("},", "}\n")); -// creator.add(json.getBytes()); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java index a6817a0..ecf7e30 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java @@ -47,6 +47,7 @@ public class IndexLoadingConfig { private ReadMode _readMode = ReadMode.DEFAULT_MODE; private List<String> _sortedColumns = Collections.emptyList(); private Set<String> _invertedIndexColumns = new HashSet<>(); + private Set<String> _jsonIndexColumns = new HashSet<>(); private Set<String> _textIndexColumns = new HashSet<>(); private Set<String> _rangeIndexColumns = new HashSet<>(); private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig. @@ -92,6 +93,11 @@ public class IndexLoadingConfig { _invertedIndexColumns.addAll(invertedIndexColumns); } + List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns(); + if (jsonIndexColumns != null) { + _jsonIndexColumns.addAll(jsonIndexColumns); + } + List<String> rangeIndexColumns = indexingConfig.getRangeIndexColumns(); if (rangeIndexColumns != null) { _rangeIndexColumns.addAll(rangeIndexColumns); @@ -222,6 +228,10 @@ public class IndexLoadingConfig { return _invertedIndexColumns; } + public Set<String> getJsonIndexColumns() { + return _invertedIndexColumns; + } + public Set<String> getRangeIndexColumns() { return _rangeIndexColumns; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java index ca7f7e9..06a7adb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java @@ -31,6 +31,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler; import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory; import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler; +import org.apache.pinot.core.segment.index.loader.invertedindex.JSONIndexHandler; import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler; import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; @@ -113,6 +114,11 @@ public class SegmentPreProcessor implements AutoCloseable { new RangeIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter); rangeIndexHandler.createRangeIndices(); + // Create json range indices according to the index config. + JSONIndexHandler jsonIndexHandler = + new JSONIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter); + jsonIndexHandler.createJsonIndices(); + Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns(); if (textIndexColumns.size() > 0) { TextIndexHandler textIndexHandler = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java new file mode 100644 index 0000000..90ca344 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.loader.invertedindex; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.indexsegment.generator.SegmentVersion; +import org.apache.pinot.core.segment.creator.impl.inv.JSONIndexCreator; +import org.apache.pinot.core.segment.index.column.PhysicalColumnIndexContainer; +import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.core.segment.index.loader.LoaderUtils; +import org.apache.pinot.core.segment.index.metadata.ColumnMetadata; +import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary; +import org.apache.pinot.core.segment.index.readers.Dictionary; +import org.apache.pinot.core.segment.index.readers.ForwardIndexReader; +import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext; +import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader; +import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReader; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.core.segment.store.ColumnIndexType; +import org.apache.pinot.core.segment.store.SegmentDirectory; +import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class JSONIndexHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(JSONIndexHandler.class); + + private final File _indexDir; + private final SegmentDirectory.Writer _segmentWriter; + private final String _segmentName; + private final SegmentVersion _segmentVersion; + private final Set<ColumnMetadata> _jsonIndexColumns = new HashSet<>(); + + public JSONIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig, + SegmentDirectory.Writer segmentWriter) { + _indexDir = indexDir; + _segmentWriter = segmentWriter; + _segmentName = segmentMetadata.getName(); + _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion()); + + // Only create json index on dictionary-encoded unsorted columns + for (String column : indexLoadingConfig.getJsonIndexColumns()) { + ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column); + if (columnMetadata != null && !columnMetadata.isSorted()) { + _jsonIndexColumns.add(columnMetadata); + } + } + } + + public void createJsonIndices() + throws IOException { + for (ColumnMetadata columnMetadata : _jsonIndexColumns) { + createJSONIndexForColumn(columnMetadata); + } + } + + private void createJSONIndexForColumn(ColumnMetadata columnMetadata) + throws IOException { + String column = columnMetadata.getColumnName(); + + File inProgress = new File(_indexDir, column + JSON_INDEX_FILE_EXTENSION + ".inprogress"); + File jsonIndexFile = new File(_indexDir, column + JSON_INDEX_FILE_EXTENSION); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + + if (_segmentWriter.hasIndexFor(column, ColumnIndexType.JSON_INDEX)) { + // Skip creating json index if already exists. + + LOGGER.info("Found json index for segment: {}, column: {}", _segmentName, column); + return; + } + + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run gets interrupted. + + // Remove json index if exists. + // For v1 and v2, it's the actual json index. For v3, it's the temporary json index. + FileUtils.deleteQuietly(jsonIndexFile); + } + + // Create new json index for the column. + LOGGER.info("Creating new json index for segment: {}, column: {}", _segmentName, column); + if (columnMetadata.hasDictionary()) { + handleDictionaryBasedColumn(columnMetadata); + } else { + handleNonDictionaryBasedColumn(columnMetadata); + } + + // For v3, write the generated json index file into the single file and remove it. + if (_segmentVersion == SegmentVersion.v3) { + LoaderUtils.writeIndexToV3Format(_segmentWriter, column, jsonIndexFile, ColumnIndexType.JSON_INDEX); + } + + // Delete the marker file. + FileUtils.deleteQuietly(inProgress); + + LOGGER.info("Created json index for segment: {}, column: {}", _segmentName, column); + } + + private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata) + throws IOException { + int numDocs = columnMetadata.getTotalDocs(); + try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter); + ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); + Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter); + JSONIndexCreator jsonIndexCreator = new JSONIndexCreator(_indexDir, columnMetadata.getFieldSpec(), + FieldSpec.DataType.BYTES)) { + if (columnMetadata.isSingleValue()) { + // Single-value column + for (int i = 0; i < numDocs; i++) { + int dictId = forwardIndexReader.getDictId(i, readerContext); + jsonIndexCreator.add(dictionary.getBytesValue(dictId)); + } + } else { + // Multi-value column + throw new IllegalStateException("JSON Indexing is not supported on multi-valued columns "); + } + jsonIndexCreator.seal(); + } + } + + private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata) + throws IOException { + FieldSpec.DataType dataType = columnMetadata.getDataType(); + if(dataType != FieldSpec.DataType.BYTES || dataType != FieldSpec.DataType.STRING) { + throw new UnsupportedOperationException("JSON indexing is only supported for STRING/BYTES datatype but found: "+ dataType); + } + int numDocs = columnMetadata.getTotalDocs(); + try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter); + ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); + JSONIndexCreator jsonIndexCreator = new JSONIndexCreator(_indexDir, columnMetadata.getFieldSpec(), + dataType)) { + if (columnMetadata.isSingleValue()) { + // Single-value column. + switch (dataType) { + case STRING: + case BYTES: + for (int i = 0; i < numDocs; i++) { + jsonIndexCreator.add(forwardIndexReader.getBytes(i, readerContext)); + } + break; + default: + throw new IllegalStateException("Unsupported data type: " + dataType); + } + } else { + // Multi-value column + switch (dataType) { + default: + throw new IllegalStateException("JSON Indexing is not supported on multi-valued columns "); + } + } + jsonIndexCreator.seal(); + } + } + + private ForwardIndexReader<?> getForwardIndexReader(ColumnMetadata columnMetadata, + SegmentDirectory.Writer segmentWriter) + throws IOException { + PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX); + int numRows = columnMetadata.getTotalDocs(); + int numBitsPerValue = columnMetadata.getBitsPerElement(); + if (columnMetadata.isSingleValue()) { + return new FixedBitSVForwardIndexReader(buffer, numRows, numBitsPerValue); + } else { + return new FixedBitMVForwardIndexReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(), + numBitsPerValue); + } + } + + private BaseImmutableDictionary getDictionaryReader(ColumnMetadata columnMetadata, + SegmentDirectory.Writer segmentWriter) + throws IOException { + PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY); + BaseImmutableDictionary dictionary = PhysicalColumnIndexContainer.loadDictionary(buffer, columnMetadata, false); + return dictionary; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java new file mode 100644 index 0000000..efb6c8b --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.readers; + +import com.google.common.base.Preconditions; +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; +import org.apache.pinot.core.query.request.context.FilterContext; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class JSONIndexReader implements Closeable { + + private static int EXPECTED_VERSION = 1; + private static int DICT_HEADER_INDEX = 0; + private static int DICT_OFFSET_INDEX = 1; + private static int DICT_DATA_INDEX = 2; + private static int INV_OFFSET_INDEX = 3; + private static int INV_DATA_INDEX = 4; + private static int FLATTENED_2_ROOT_INDEX = 5; + + private final BitmapInvertedIndexReader invertedIndexReader; + private final StringDictionary dictionary; + private final long cardinality; + private final long numFlattenedDocs; + + public JSONIndexReader(PinotDataBuffer pinotDataBuffer) { + + int version = pinotDataBuffer.getInt(0); + int maxKeyLength = pinotDataBuffer.getInt(1 * Integer.BYTES); + + Preconditions.checkState(version == EXPECTED_VERSION, String + .format("Index version:{} is not supported by this reader. expected version:{}", version, EXPECTED_VERSION)); + + // dictionaryHeaderFile, dictionaryOffsetFile, dictionaryFile, invIndexOffsetFile, invIndexFile, FlattenedDocId2DocIdMappingFile + int numBuffers = 6; + long bufferStartOffsets[] = new long[numBuffers]; + long bufferSizeArray[] = new long[numBuffers]; + for (int i = 0; i < numBuffers; i++) { + bufferStartOffsets[i] = pinotDataBuffer.getLong(2 * Integer.BYTES + 2 * i * Long.BYTES); + bufferSizeArray[i] = pinotDataBuffer.getLong(2 * Integer.BYTES + 2 * i * Long.BYTES + Long.BYTES); + } + cardinality = bufferSizeArray[DICT_OFFSET_INDEX] / Integer.BYTES - 1; + numFlattenedDocs = bufferSizeArray[FLATTENED_2_ROOT_INDEX] / Integer.BYTES; + + long dictionaryStartOffset = bufferStartOffsets[DICT_HEADER_INDEX]; + long dictionarySize = + bufferSizeArray[DICT_HEADER_INDEX] + bufferSizeArray[DICT_OFFSET_INDEX] + bufferSizeArray[DICT_DATA_INDEX]; + + //TODO: REMOVE DEBUG START + byte[] dictHeaderBytes = new byte[(int) bufferSizeArray[DICT_HEADER_INDEX]]; + pinotDataBuffer.copyTo(bufferStartOffsets[DICT_HEADER_INDEX], dictHeaderBytes); + System.out.println("Arrays.toString(dictHeaderBytes) = " + Arrays.toString(dictHeaderBytes)); + //TODO: REMOVE DEBUG END + + PinotDataBuffer dictionaryBuffer = + pinotDataBuffer.view(dictionaryStartOffset, dictionaryStartOffset + dictionarySize); + dictionary = new StringDictionary(dictionaryBuffer, (int) cardinality, maxKeyLength, Byte.valueOf("0")); + + long invIndexStartOffset = bufferStartOffsets[INV_OFFSET_INDEX]; + long invIndexSize = bufferSizeArray[INV_OFFSET_INDEX] + bufferSizeArray[INV_DATA_INDEX]; + + PinotDataBuffer invIndexBuffer = pinotDataBuffer.view(invIndexStartOffset, invIndexStartOffset + invIndexSize); + invertedIndexReader = new BitmapInvertedIndexReader(invIndexBuffer, (int) cardinality); + + //TODO: REMOVE DEBUG START + for (int dictId = 0; dictId < dictionary.length(); dictId++) { + System.out.println("Key = " + new String(dictionary.getBytes(dictId))); + System.out.println("Posting List = " + invertedIndexReader.getDocIds(dictId)); + } + //TODO: REMOVE DEBUG END + + } + + /** + * Returns the matching document ids for the given search query. + */ + public MutableRoaringBitmap getMatchingDocIds(Predicate predicate) { + + PredicateEvaluator predicateEvaluator = + PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dictionary, FieldSpec.DataType.BYTES); + boolean exclusive = predicateEvaluator.isExclusive(); + int[] dictIds = exclusive ? predicateEvaluator.getNonMatchingDictIds() : predicateEvaluator.getMatchingDictIds(); + int numDictIds = dictIds.length; + + if (numDictIds == 1) { + ImmutableRoaringBitmap docIds = (ImmutableRoaringBitmap) invertedIndexReader.getDocIds(dictIds[0]); + if (exclusive) { + if (docIds instanceof MutableRoaringBitmap) { + MutableRoaringBitmap mutableRoaringBitmap = (MutableRoaringBitmap) docIds; + mutableRoaringBitmap.flip(0L, numFlattenedDocs); + return mutableRoaringBitmap; + } else { + return ImmutableRoaringBitmap.flip(docIds, 0L, numFlattenedDocs); + } + } else { + return docIds.toMutableRoaringBitmap(); + } + } else { + ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[numDictIds]; + for (int i = 0; i < numDictIds; i++) { + bitmaps[i] = (ImmutableRoaringBitmap) invertedIndexReader.getDocIds(dictIds[i]); + } + MutableRoaringBitmap docIds = ImmutableRoaringBitmap.or(bitmaps); + if (exclusive) { + docIds.flip(0L, numFlattenedDocs); + } + return docIds; + } + } + + @Override + public void close() + throws IOException { + + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java index dcd21df..f9063b8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java @@ -25,7 +25,8 @@ public enum ColumnIndexType { BLOOM_FILTER("bloom_filter"), NULLVALUE_VECTOR("nullvalue_vector"), TEXT_INDEX("text_index"), - RANGE_INDEX("range_index"); + RANGE_INDEX("range_index"), + JSON_INDEX("json_index"); private final String indexName; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java index 51dd2fb..4ffc133 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java @@ -26,6 +26,7 @@ import java.nio.ByteOrder; import java.util.HashMap; import java.util.Map; import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.core.segment.creator.impl.V1Constants; import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; import org.apache.pinot.core.segment.memory.PinotDataBuffer; @@ -135,6 +136,10 @@ class FilePerIndexDirectory extends ColumnIndexDirectory { case TEXT_INDEX: filename = column + LuceneTextIndexCreator.LUCENE_TEXT_INDEX_FILE_EXTENSION; break; + case JSON_INDEX: + filename = column + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION; + break; + default: throw new UnsupportedOperationException("Unknown index type: " + indexType.toString()); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java index 3dd137b..7965654 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java @@ -27,6 +27,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig; public class IndexingConfig extends BaseJsonConfig { private List<String> _invertedIndexColumns; private List<String> _rangeIndexColumns; + private List<String> _jsonIndexColumns; private boolean _autoGeneratedInvertedIndex; private boolean _createInvertedIndexDuringSegmentGeneration; private List<String> _sortedColumn; @@ -71,6 +72,14 @@ public class IndexingConfig extends BaseJsonConfig { _rangeIndexColumns = rangeIndexColumns; } + public List<String> getJsonIndexColumns() { + return _jsonIndexColumns; + } + + public void setJsonIndexColumns(List<String> jsonIndexColumns) { + _jsonIndexColumns = jsonIndexColumns; + } + public boolean isAutoGeneratedInvertedIndex() { return _autoGeneratedInvertedIndex; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java index b04ecb7..c9c245b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java @@ -337,7 +337,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec> { public enum DataType { // LIST is for complex lists which is different from multi-value column of primitives // STRUCT, MAP and LIST are composable to form a COMPLEX field - INT, LONG, FLOAT, DOUBLE, BOOLEAN/* Stored as STRING */, STRING, BYTES, STRUCT, MAP, LIST; + INT, LONG, FLOAT, DOUBLE, BOOLEAN/* Stored as STRING */, STRING, BYTES, STRUCT, MAP, LIST, JSON; /** * Returns the data type stored in Pinot. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org