This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new c76ce2a Support null value fields in generic row ser/de (#6968) c76ce2a is described below commit c76ce2aa9616b4b8ac61a99776f56be525e9c0f8 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue May 25 20:57:21 2021 -0700 Support null value fields in generic row ser/de (#6968) - Replace `GenericRowSerDeUtils` with `GenericRowSerializer` and `GenericRowDeserailizer` for the following enhancement: - Support null value fields ser/de to preserve the null fields information - Cache the encoded string bytes to avoid encoding strings twice - Support partial deserialize values to only deserialize the needed fields when sorting the rows - Add `SortOrderComparator` to compare the partial deserialized values - Modify `ConcatCollector` to use the new classes - Add `GenericRowSerDeTest` to test the new classes --- .../processing/collector/ConcatCollector.java | 65 +++-- .../processing/collector/SortOrderComparator.java | 72 +++++ .../processing/serde/GenericRowDeserializer.java | 223 +++++++++++++++ .../processing/serde/GenericRowSerializer.java | 237 ++++++++++++++++ .../pinot/core/util/GenericRowSerDeUtils.java | 316 --------------------- .../processing/serde/GenericRowSerDeTest.java | 137 +++++++++ .../apache/pinot/spi/data/readers/GenericRow.java | 7 + 7 files changed, 717 insertions(+), 340 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java index 7fad934..d760274 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java @@ -26,15 +26,16 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; -import org.apache.pinot.core.util.GenericRowSerDeUtils; +import org.apache.pinot.core.segment.processing.serde.GenericRowDeserializer; +import org.apache.pinot.core.segment.processing.serde.GenericRowSerializer; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -46,11 +47,14 @@ public class ConcatCollector implements Collector { private static final String RECORDS_FILE_NAME = "collector.records"; private final List<FieldSpec> _fieldSpecs = new ArrayList<>(); - private final Comparator<GenericRow> _genericRowComparator; - private int _numDocs; - + private final GenericRowSerializer _genericRowSerializer; + private final int _numSortColumns; + private final SortOrderComparator _sortOrderComparator; private final File _workingDir; private final File _collectorRecordFile; + + private int _numDocs; + // TODO: Avoid using BufferedOutputStream, and use ByteBuffer directly. // However, ByteBuffer has a limitation that the size cannot exceed 2G. // There are no limits on the size of data inserted into the {@link Collector}. @@ -58,21 +62,38 @@ public class ConcatCollector implements Collector { private BufferedOutputStream _collectorRecordOutputStream; private List<Long> _collectorRecordOffsets; private PinotDataBuffer _collectorRecordBuffer; + private GenericRowDeserializer _genericRowDeserializer; public ConcatCollector(CollectorConfig collectorConfig, Schema schema) { - - for (FieldSpec spec : schema.getAllFieldSpecs()) { - if (!spec.isVirtualColumn()) { - _fieldSpecs.add(spec); - } - } List<String> sortOrder = collectorConfig.getSortOrder(); if (CollectionUtils.isNotEmpty(sortOrder)) { - GenericRowSorter sorter = new GenericRowSorter(sortOrder, schema); - _genericRowComparator = sorter.getGenericRowComparator(); + _numSortColumns = sortOrder.size(); + DataType[] sortColumnStoredTypes = new DataType[_numSortColumns]; + for (int i = 0; i < _numSortColumns; i++) { + String sortColumn = sortOrder.get(i); + FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn); + Preconditions.checkArgument(fieldSpec != null, "Failed to find sort column: %s", sortColumn); + Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot sort on MV column: %s", sortColumn); + sortColumnStoredTypes[i] = fieldSpec.getDataType().getStoredType(); + _fieldSpecs.add(fieldSpec); + } + _sortOrderComparator = new SortOrderComparator(_numSortColumns, sortColumnStoredTypes); + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + if (!fieldSpec.isVirtualColumn() && !sortOrder.contains(fieldSpec.getName())) { + _fieldSpecs.add(fieldSpec); + } + } } else { - _genericRowComparator = null; + _numSortColumns = 0; + _sortOrderComparator = null; + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + if (!fieldSpec.isVirtualColumn()) { + _fieldSpecs.add(fieldSpec); + } + } } + // TODO: Pass 'includeNullFields' from the config + _genericRowSerializer = new GenericRowSerializer(_fieldSpecs, true); _workingDir = new File(FileUtils.getTempDirectory(), String.format("concat_collector_%d", System.currentTimeMillis())); @@ -84,7 +105,6 @@ public class ConcatCollector implements Collector { } private void initializeBuffer() { - Preconditions.checkState(!_collectorRecordFile.exists(), "Collector record file: " + _collectorRecordFile + " already exists"); try { @@ -100,7 +120,7 @@ public class ConcatCollector implements Collector { @Override public void collect(GenericRow genericRow) throws IOException { - byte[] genericRowBytes = GenericRowSerDeUtils.serializeGenericRow(genericRow, _fieldSpecs); + byte[] genericRowBytes = _genericRowSerializer.serialize(genericRow); _collectorRecordOutputStream.write(genericRowBytes); _collectorRecordOffsets.add(_collectorRecordOffsets.get(_numDocs) + genericRowBytes.length); _numDocs++; @@ -109,14 +129,14 @@ public class ConcatCollector implements Collector { @Override public Iterator<GenericRow> iterator() throws IOException { - _collectorRecordOutputStream.flush(); _collectorRecordBuffer = PinotDataBuffer .mapFile(_collectorRecordFile, true, 0, _collectorRecordOffsets.get(_numDocs), PinotDataBuffer.NATIVE_ORDER, "ConcatCollector: generic row buffer"); + _genericRowDeserializer = new GenericRowDeserializer(_collectorRecordBuffer, _fieldSpecs, true); // TODO: A lot of this code can be made common across Collectors, once {@link RollupCollector} is also converted to off heap implementation - if (_genericRowComparator != null) { + if (_numSortColumns != 0) { int[] sortedDocIds = new int[_numDocs]; for (int i = 0; i < _numDocs; i++) { sortedDocIds[i] = i; @@ -125,11 +145,8 @@ public class ConcatCollector implements Collector { Arrays.quickSort(0, _numDocs, (i1, i2) -> { long startOffset1 = _collectorRecordOffsets.get(sortedDocIds[i1]); long startOffset2 = _collectorRecordOffsets.get(sortedDocIds[i2]); - GenericRow row1 = GenericRowSerDeUtils - .deserializeGenericRow(_collectorRecordBuffer, startOffset1, _fieldSpecs, new GenericRow()); - GenericRow row2 = GenericRowSerDeUtils - .deserializeGenericRow(_collectorRecordBuffer, startOffset2, _fieldSpecs, new GenericRow()); - return _genericRowComparator.compare(row1, row2); + return _sortOrderComparator.compare(_genericRowDeserializer.partialDeserialize(startOffset1, _numSortColumns), + _genericRowDeserializer.partialDeserialize(startOffset2, _numSortColumns)); }, (i1, i2) -> { int temp = sortedDocIds[i1]; sortedDocIds[i1] = sortedDocIds[i2]; @@ -159,7 +176,7 @@ public class ConcatCollector implements Collector { } else { offset = _collectorRecordOffsets.get(sortedDocIds[_nextDocId++]); } - return GenericRowSerDeUtils.deserializeGenericRow(_collectorRecordBuffer, offset, _fieldSpecs, _reuse); + return _genericRowDeserializer.deserialize(offset, _reuse); } }; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java new file mode 100644 index 0000000..34b63b6 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.collector; + +import java.util.Comparator; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.utils.ByteArray; + + +/** + * Comparator for values of the sort columns. + */ +public class SortOrderComparator implements Comparator<Object[]> { + private final int _numSortColumns; + private final DataType[] _sortColumnStoredTypes; + + public SortOrderComparator(int numSortColumns, DataType[] sortColumnStoredTypes) { + _numSortColumns = numSortColumns; + _sortColumnStoredTypes = sortColumnStoredTypes; + } + + @Override + public int compare(Object[] o1, Object[] o2) { + for (int i = 0; i < _numSortColumns; i++) { + Object value1 = o1[i]; + Object value2 = o2[i]; + int result; + switch (_sortColumnStoredTypes[i]) { + case INT: + result = Integer.compare((int) value1, (int) value2); + break; + case LONG: + result = Long.compare((long) value1, (long) value2); + break; + case FLOAT: + result = Float.compare((float) value1, (float) value2); + break; + case DOUBLE: + result = Double.compare((double) value1, (double) value2); + break; + case STRING: + result = ((String) value1).compareTo((String) value2); + break; + case BYTES: + result = ByteArray.compare((byte[]) value1, (byte[]) value2); + break; + default: + throw new IllegalStateException("Unsupported sort column stored type: " + _sortColumnStoredTypes[i]); + } + if (result != 0) { + return result; + } + } + return 0; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowDeserializer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowDeserializer.java new file mode 100644 index 0000000..83a5863 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowDeserializer.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.serde; + +import com.google.common.base.Preconditions; +import java.util.List; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.StringUtils; + + +/** + * Utility class to deserialize the {@link GenericRow}. + * The bytes are read in NATIVE order. The data should be serialized by the {@link GenericRowSerializer} on the same + * host to ensure that both of them are using the same byte order. + */ +public class GenericRowDeserializer { + private final PinotDataBuffer _dataBuffer; + private final int _numFields; + private final String[] _fieldNames; + private final boolean[] _isSingleValueFields; + private final DataType[] _storedTypes; + private final boolean _includeNullFields; + + public GenericRowDeserializer(PinotDataBuffer dataBuffer, List<FieldSpec> fieldSpecs, boolean includeNullFields) { + _dataBuffer = dataBuffer; + _numFields = fieldSpecs.size(); + _fieldNames = new String[_numFields]; + _isSingleValueFields = new boolean[_numFields]; + _storedTypes = new DataType[_numFields]; + for (int i = 0; i < _numFields; i++) { + FieldSpec fieldSpec = fieldSpecs.get(i); + _fieldNames[i] = fieldSpec.getName(); + _isSingleValueFields[i] = fieldSpec.isSingleValueField(); + _storedTypes[i] = fieldSpec.getDataType().getStoredType(); + } + _includeNullFields = includeNullFields; + } + + /** + * Deserializes the {@link GenericRow} at the given offset. + */ + public GenericRow deserialize(long offset, GenericRow reuse) { + reuse.clear(); + + for (int i = 0; i < _numFields; i++) { + String fieldName = _fieldNames[i]; + + if (_isSingleValueFields[i]) { + switch (_storedTypes[i]) { + case INT: + int intValue = _dataBuffer.getInt(offset); + reuse.putValue(fieldName, intValue); + offset += Integer.BYTES; + break; + case LONG: + long longValue = _dataBuffer.getLong(offset); + reuse.putValue(fieldName, longValue); + offset += Long.BYTES; + break; + case FLOAT: + float floatValue = _dataBuffer.getFloat(offset); + reuse.putValue(fieldName, floatValue); + offset += Float.BYTES; + break; + case DOUBLE: + double doubleValue = _dataBuffer.getDouble(offset); + reuse.putValue(fieldName, doubleValue); + offset += Double.BYTES; + break; + case STRING: { + int numBytes = _dataBuffer.getInt(offset); + offset += Integer.BYTES; + byte[] stringBytes = new byte[numBytes]; + _dataBuffer.copyTo(offset, stringBytes); + offset += numBytes; + reuse.putValue(fieldName, StringUtils.decodeUtf8(stringBytes)); + break; + } + case BYTES: { + int numBytes = _dataBuffer.getInt(offset); + offset += Integer.BYTES; + byte[] bytes = new byte[numBytes]; + _dataBuffer.copyTo(offset, bytes); + offset += numBytes; + reuse.putValue(fieldName, bytes); + break; + } + default: + throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]); + } + } else { + int numValues = _dataBuffer.getInt(offset); + offset += Integer.BYTES; + Object[] multiValue = new Object[numValues]; + + switch (_storedTypes[i]) { + case INT: + for (int j = 0; j < numValues; j++) { + multiValue[j] = _dataBuffer.getInt(offset); + offset += Integer.BYTES; + } + break; + case LONG: + for (int j = 0; j < numValues; j++) { + multiValue[j] = _dataBuffer.getLong(offset); + offset += Long.BYTES; + } + break; + case FLOAT: + for (int j = 0; j < numValues; j++) { + multiValue[j] = _dataBuffer.getFloat(offset); + offset += Float.BYTES; + } + break; + case DOUBLE: + for (int j = 0; j < numValues; j++) { + multiValue[j] = _dataBuffer.getDouble(offset); + offset += Double.BYTES; + } + break; + case STRING: + for (int j = 0; j < numValues; j++) { + int numBytes = _dataBuffer.getInt(offset); + offset += Integer.BYTES; + byte[] stringBytes = new byte[numBytes]; + _dataBuffer.copyTo(offset, stringBytes); + offset += numBytes; + multiValue[j] = StringUtils.decodeUtf8(stringBytes); + } + break; + default: + throw new IllegalStateException("Unsupported MV stored type: " + _storedTypes[i]); + } + + reuse.putValue(fieldName, multiValue); + } + } + + // Deserialize null fields if enabled + if (_includeNullFields) { + int numNullFields = _dataBuffer.getInt(offset); + offset += Integer.BYTES; + for (int i = 0; i < numNullFields; i++) { + reuse.addNullValueField(_fieldNames[_dataBuffer.getInt(offset)]); + offset += Integer.BYTES; + } + } + + return reuse; + } + + /** + * Deserializes the first several fields at the given offset. This method can be used to sort the generic rows without + * fully deserialize the whole row for each comparison. The selected fields should all be single-valued. + */ + public Object[] partialDeserialize(long offset, int numFields) { + Object[] values = new Object[numFields]; + + for (int i = 0; i < numFields; i++) { + Preconditions.checkState(_isSingleValueFields[i], "Partial deserialize should not be applied to MV column: %s", + _fieldNames[i]); + switch (_storedTypes[i]) { + case INT: + values[i] = _dataBuffer.getInt(offset); + offset += Integer.BYTES; + break; + case LONG: + values[i] = _dataBuffer.getLong(offset); + offset += Long.BYTES; + break; + case FLOAT: + values[i] = _dataBuffer.getFloat(offset); + offset += Float.BYTES; + break; + case DOUBLE: + values[i] = _dataBuffer.getDouble(offset); + offset += Double.BYTES; + break; + case STRING: { + int numBytes = _dataBuffer.getInt(offset); + offset += Integer.BYTES; + byte[] stringBytes = new byte[numBytes]; + _dataBuffer.copyTo(offset, stringBytes); + offset += numBytes; + values[i] = StringUtils.decodeUtf8(stringBytes); + break; + } + case BYTES: { + int numBytes = _dataBuffer.getInt(offset); + offset += Integer.BYTES; + byte[] bytes = new byte[numBytes]; + _dataBuffer.copyTo(offset, bytes); + offset += numBytes; + values[i] = bytes; + break; + } + default: + throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]); + } + } + + return values; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerializer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerializer.java new file mode 100644 index 0000000..7d70201 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerializer.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.serde; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.StringUtils; + + +/** + * Utility class to serialize the {@link GenericRow}. + * The bytes are stored in NATIVE order. The data should be deserialized by the {@link GenericRowDeserializer} on the + * same host to ensure that both of them are using the same byte order. + */ +public class GenericRowSerializer { + private final int _numFields; + private final String[] _fieldNames; + private final boolean[] _isSingleValueFields; + private final DataType[] _storedTypes; + // Cache the encoded string bytes + private final Object[] _stringBytes; + // Store index for null fields + private final Map<String, Integer> _fieldIndexMap; + private final int[] _nullFieldIndexes; + + public GenericRowSerializer(List<FieldSpec> fieldSpecs, boolean includeNullFields) { + _numFields = fieldSpecs.size(); + _fieldNames = new String[_numFields]; + _isSingleValueFields = new boolean[_numFields]; + _storedTypes = new DataType[_numFields]; + _stringBytes = new Object[_numFields]; + for (int i = 0; i < _numFields; i++) { + FieldSpec fieldSpec = fieldSpecs.get(i); + _fieldNames[i] = fieldSpec.getName(); + _isSingleValueFields[i] = fieldSpec.isSingleValueField(); + _storedTypes[i] = fieldSpec.getDataType().getStoredType(); + } + if (includeNullFields) { + _fieldIndexMap = new HashMap<>(); + for (int i = 0; i < _numFields; i++) { + _fieldIndexMap.put(_fieldNames[i], i); + } + _nullFieldIndexes = new int[_numFields]; + } else { + _fieldIndexMap = null; + _nullFieldIndexes = null; + } + } + + /** + * Serializes the given {@link GenericRow}. + */ + public byte[] serialize(GenericRow row) { + int numBytes = 0; + + // First pass: calculate the number of bytes required + for (int i = 0; i < _numFields; i++) { + Object value = row.getValue(_fieldNames[i]); + + if (_isSingleValueFields[i]) { + switch (_storedTypes[i]) { + case INT: + numBytes += Integer.BYTES; + break; + case LONG: + numBytes += Long.BYTES; + break; + case FLOAT: + numBytes += Float.BYTES; + break; + case DOUBLE: + numBytes += Double.BYTES; + break; + case STRING: + byte[] stringBytes = StringUtils.encodeUtf8((String) value); + numBytes += Integer.BYTES + stringBytes.length; + _stringBytes[i] = stringBytes; + break; + case BYTES: + numBytes += Integer.BYTES + ((byte[]) value).length; + break; + default: + throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]); + } + } else { + Object[] multiValue = (Object[]) value; + int numValues = multiValue.length; + numBytes += Integer.BYTES; // Number of values + + switch (_storedTypes[i]) { + case INT: + numBytes += Integer.BYTES * numValues; + break; + case LONG: + numBytes += Long.BYTES * numValues; + break; + case FLOAT: + numBytes += Float.BYTES * numValues; + break; + case DOUBLE: + numBytes += Double.BYTES * numValues; + break; + case STRING: + numBytes += Integer.BYTES * numValues; + byte[][] stringBytesArray = new byte[numValues][]; + for (int j = 0; j < numValues; j++) { + byte[] stringBytes = StringUtils.encodeUtf8((String) multiValue[j]); + numBytes += stringBytes.length; + stringBytesArray[j] = stringBytes; + } + _stringBytes[i] = stringBytesArray; + break; + default: + throw new IllegalStateException("Unsupported MV stored type: " + _storedTypes[i]); + } + } + } + + // Serialize null fields if enabled + int numNullFields = 0; + if (_fieldIndexMap != null) { + Set<String> nullFields = row.getNullValueFields(); + for (String nullField : nullFields) { + Integer nullFieldIndex = _fieldIndexMap.get(nullField); + if (nullFieldIndex != null) { + _nullFieldIndexes[numNullFields++] = nullFieldIndex; + } + } + numBytes += Integer.BYTES * (1 + numNullFields); + } + + byte[] serializedBytes = new byte[numBytes]; + ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes).order(PinotDataBuffer.NATIVE_ORDER); + + // Second pass: serialize the values + for (int i = 0; i < _numFields; i++) { + Object value = row.getValue(_fieldNames[i]); + + if (_isSingleValueFields[i]) { + switch (_storedTypes[i]) { + case INT: + byteBuffer.putInt((int) value); + break; + case LONG: + byteBuffer.putLong((long) value); + break; + case FLOAT: + byteBuffer.putFloat((float) value); + break; + case DOUBLE: + byteBuffer.putDouble((double) value); + break; + case STRING: + byte[] stringBytes = (byte[]) _stringBytes[i]; + byteBuffer.putInt(stringBytes.length); + byteBuffer.put(stringBytes); + break; + case BYTES: + byte[] bytes = (byte[]) value; + byteBuffer.putInt(bytes.length); + byteBuffer.put(bytes); + break; + default: + throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]); + } + } else { + Object[] multiValue = (Object[]) value; + byteBuffer.putInt(multiValue.length); + + switch (_storedTypes[i]) { + case INT: + for (Object element : multiValue) { + byteBuffer.putInt((int) element); + } + break; + case LONG: + for (Object element : multiValue) { + byteBuffer.putLong((long) element); + } + break; + case FLOAT: + for (Object element : multiValue) { + byteBuffer.putFloat((float) element); + } + break; + case DOUBLE: + for (Object element : multiValue) { + byteBuffer.putDouble((double) element); + } + break; + case STRING: + byte[][] stringBytesArray = (byte[][]) _stringBytes[i]; + for (byte[] stringBytes : stringBytesArray) { + byteBuffer.putInt(stringBytes.length); + byteBuffer.put(stringBytes); + } + break; + default: + throw new IllegalStateException("Unsupported MV stored type: " + _storedTypes[i]); + } + } + } + + // Serialize null fields if enabled + if (_fieldIndexMap != null) { + byteBuffer.putInt(numNullFields); + for (int i = 0; i < numNullFields; i++) { + byteBuffer.putInt(_nullFieldIndexes[i]); + } + } + + return serializedBytes; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GenericRowSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GenericRowSerDeUtils.java deleted file mode 100644 index 2505c8c..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/GenericRowSerDeUtils.java +++ /dev/null @@ -1,316 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.util; - -import java.nio.ByteBuffer; -import java.util.List; -import org.apache.pinot.segment.spi.memory.PinotDataBuffer; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.utils.StringUtils; - - -/** - * Utility methods for serde of {@link GenericRow} - * Deserialization assumes it is deserializing from a {@link PinotDataBuffer} - */ -public final class GenericRowSerDeUtils { - - private GenericRowSerDeUtils() { - - } - - /** - * Serialize the given GenericRow. The data is stored in native byte order. - * @param genericRow GenericRow to serialize - * @param fieldSpecs the fields to serialize - * @return serialized bytes - */ - public static byte[] serializeGenericRow(GenericRow genericRow, List<FieldSpec> fieldSpecs) { - int numBytes = 0; - - for (FieldSpec fieldSpec : fieldSpecs) { - Object value = genericRow.getValue(fieldSpec.getName()); - - if (fieldSpec.isSingleValueField()) { - switch (fieldSpec.getDataType().getStoredType()) { - - case INT: - numBytes += Integer.BYTES; - break; - case LONG: - numBytes += Long.BYTES; - break; - case FLOAT: - numBytes += Float.BYTES; - break; - case DOUBLE: - numBytes += Double.BYTES; - break; - case STRING: - byte[] stringBytes = StringUtils.encodeUtf8((String) value); - numBytes += Integer.BYTES; // string length - numBytes += stringBytes.length; - break; - case BYTES: - numBytes += Integer.BYTES; // byte array length - numBytes += ((byte[]) value).length; - break; - default: - throw new UnsupportedOperationException( - String.format("DataType '%s' not supported", fieldSpec.getDataType())); - } - } else { - Object[] multiValue = (Object[]) value; - numBytes += Integer.BYTES; // array length - - switch (fieldSpec.getDataType().getStoredType()) { - case INT: - numBytes += Integer.BYTES * multiValue.length; - break; - case LONG: - numBytes += Long.BYTES * multiValue.length; - break; - case FLOAT: - numBytes += Float.BYTES * multiValue.length; - break; - case DOUBLE: - numBytes += Double.BYTES * multiValue.length; - break; - case STRING: - for (Object element : multiValue) { - byte[] stringBytes = StringUtils.encodeUtf8((String) element); - numBytes += Integer.BYTES; // string length - numBytes += stringBytes.length; - } - break; - case BYTES: - for (Object element : multiValue) { - numBytes += Integer.BYTES; // byte array length - numBytes += ((byte[]) element).length; - } - break; - default: - throw new UnsupportedOperationException( - String.format("DataType '%s' not supported", fieldSpec.getDataType())); - } - } - } - - byte[] genericRowBytes = new byte[numBytes]; - ByteBuffer byteBuffer = ByteBuffer.wrap(genericRowBytes).order(PinotDataBuffer.NATIVE_ORDER); - - for (FieldSpec fieldSpec : fieldSpecs) { - Object value = genericRow.getValue(fieldSpec.getName()); - - if (fieldSpec.isSingleValueField()) { - switch (fieldSpec.getDataType().getStoredType()) { - - case INT: - byteBuffer.putInt((int) value); - break; - case LONG: - byteBuffer.putLong((long) value); - break; - case FLOAT: - byteBuffer.putFloat((float) value); - break; - case DOUBLE: - byteBuffer.putDouble((double) value); - break; - case STRING: - byte[] stringBytes = StringUtils.encodeUtf8((String) value); - byteBuffer.putInt(stringBytes.length); - byteBuffer.put(stringBytes); - break; - case BYTES: - byte[] bytes = (byte[]) value; - byteBuffer.putInt(bytes.length); - byteBuffer.put(bytes); - break; - default: - throw new UnsupportedOperationException( - String.format("DataType '%s' not supported", fieldSpec.getDataType())); - } - } else { - Object[] multiValue = (Object[]) value; - byteBuffer.putInt(multiValue.length); - - switch (fieldSpec.getDataType().getStoredType()) { - - case INT: - for (Object element : multiValue) { - byteBuffer.putInt((int) element); - } - break; - case LONG: - for (Object element : multiValue) { - byteBuffer.putLong((long) element); - } - break; - case FLOAT: - for (Object element : multiValue) { - byteBuffer.putFloat((float) element); - } - break; - case DOUBLE: - for (Object element : multiValue) { - byteBuffer.putDouble((double) element); - } - break; - case STRING: - for (Object element : multiValue) { - byte[] stringBytes = StringUtils.encodeUtf8((String) element); - byteBuffer.putInt(stringBytes.length); - byteBuffer.put(stringBytes); - } - break; - case BYTES: - for (Object element : multiValue) { - byte[] bytes = (byte[]) element; - byteBuffer.putInt(bytes.length); - byteBuffer.put(bytes); - } - break; - default: - throw new UnsupportedOperationException( - String.format("DataType '%s' not supported", fieldSpec.getDataType())); - } - } - } - return genericRowBytes; - } - - /** - * Deserializes bytes from the native order data buffer to GenericRow - * @param dataBuffer the pinot data buffer - * @param offset offset to begin reading from - * @param fieldSpecs list of field specs to determine fields in deserialization - * @param reuse GenericRow object for returning - * @return Deserialized GenericRow - */ - public static GenericRow deserializeGenericRow(PinotDataBuffer dataBuffer, long offset, List<FieldSpec> fieldSpecs, - GenericRow reuse) { - for (FieldSpec fieldSpec : fieldSpecs) { - String fieldName = fieldSpec.getName(); - if (fieldSpec.isSingleValueField()) { - switch (fieldSpec.getDataType().getStoredType()) { - - case INT: - int intValue = dataBuffer.getInt(offset); - reuse.putValue(fieldName, intValue); - offset += Integer.BYTES; - break; - case LONG: - long longValue = dataBuffer.getLong(offset); - reuse.putValue(fieldName, longValue); - offset += Long.BYTES; - break; - case FLOAT: - float floatValue = dataBuffer.getFloat(offset); - reuse.putValue(fieldName, floatValue); - offset += Float.BYTES; - break; - case DOUBLE: - double doubleValue = dataBuffer.getDouble(offset); - reuse.putValue(fieldName, doubleValue); - offset += Double.BYTES; - break; - case STRING: - int stringSize = dataBuffer.getInt(offset); - offset += Integer.BYTES; - byte[] stringBytes = new byte[stringSize]; - dataBuffer.copyTo(offset, stringBytes); - offset += stringSize; - reuse.putValue(fieldName, StringUtils.decodeUtf8(stringBytes)); - break; - case BYTES: - int bytesSize = dataBuffer.getInt(offset); - offset += Integer.BYTES; - byte[] bytes = new byte[bytesSize]; - dataBuffer.copyTo(offset, bytes); - offset += bytesSize; - reuse.putValue(fieldName, bytes); - break; - default: - throw new UnsupportedOperationException( - String.format("DataType '%s' not supported", fieldSpec.getDataType())); - } - } else { - - int numMultiValues = dataBuffer.getInt(offset); - offset += Integer.BYTES; - Object[] values = new Object[numMultiValues]; - - switch (fieldSpec.getDataType().getStoredType()) { - - case INT: - for (int i = 0; i < numMultiValues; i++) { - values[i] = dataBuffer.getInt(offset); - offset += Integer.BYTES; - } - break; - case LONG: - for (int i = 0; i < numMultiValues; i++) { - values[i] = dataBuffer.getLong(offset); - offset += Long.BYTES; - } - break; - case FLOAT: - for (int i = 0; i < numMultiValues; i++) { - values[i] = dataBuffer.getFloat(offset); - offset += Float.BYTES; - } - break; - case DOUBLE: - for (int i = 0; i < numMultiValues; i++) { - values[i] = dataBuffer.getDouble(offset); - offset += Double.BYTES; - } - break; - case STRING: - for (int i = 0; i < numMultiValues; i++) { - int stringSize = dataBuffer.getInt(offset); - offset += Integer.BYTES; - byte[] stringBytes = new byte[stringSize]; - dataBuffer.copyTo(offset, stringBytes); - offset += stringSize; - values[i] = StringUtils.decodeUtf8(stringBytes); - } - break; - case BYTES: - for (int i = 0; i < numMultiValues; i++) { - int bytesSize = dataBuffer.getInt(offset); - offset += Integer.BYTES; - byte[] bytes = new byte[bytesSize]; - dataBuffer.copyTo(offset, bytes); - offset += bytesSize; - values[i] = bytes; - } - break; - default: - throw new UnsupportedOperationException( - String.format("DataType '%s' not supported", fieldSpec.getDataType())); - } - reuse.putValue(fieldName, values); - } - } - return reuse; - } -} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerDeTest.java new file mode 100644 index 0000000..fc2d8ab --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerDeTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.serde; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class GenericRowSerDeTest { + private List<FieldSpec> _fieldSpecs; + private GenericRow _row; + + @BeforeClass + public void setUp() { + _fieldSpecs = Arrays.asList(new DimensionFieldSpec("intSV", DataType.INT, true), + new DimensionFieldSpec("longSV", DataType.LONG, true), new DimensionFieldSpec("floatSV", DataType.FLOAT, true), + new DimensionFieldSpec("doubleSV", DataType.DOUBLE, true), + new DimensionFieldSpec("stringSV", DataType.STRING, true), + new DimensionFieldSpec("bytesSV", DataType.BYTES, true), new DimensionFieldSpec("nullSV", DataType.INT, true), + new DimensionFieldSpec("intMV", DataType.INT, false), new DimensionFieldSpec("longMV", DataType.LONG, false), + new DimensionFieldSpec("floatMV", DataType.FLOAT, false), + new DimensionFieldSpec("doubleMV", DataType.DOUBLE, false), + new DimensionFieldSpec("stringMV", DataType.STRING, false), + new DimensionFieldSpec("nullMV", DataType.LONG, false)); + + _row = new GenericRow(); + _row.putValue("intSV", 123); + _row.putValue("longSV", 123L); + _row.putValue("floatSV", 123.0f); + _row.putValue("doubleSV", 123.0); + _row.putValue("stringSV", "123"); + _row.putValue("bytesSV", new byte[]{1, 2, 3}); + _row.putDefaultNullValue("nullSV", Integer.MAX_VALUE); + _row.putValue("intMV", new Object[]{123, 456}); + _row.putValue("longMV", new Object[]{123L, 456L}); + _row.putValue("floatMV", new Object[]{123.0f, 456.0f}); + _row.putValue("doubleMV", new Object[]{123.0, 456.0}); + _row.putValue("stringMV", new Object[]{"123", "456"}); + _row.putDefaultNullValue("nullMV", new Object[]{Long.MIN_VALUE}); + } + + @Test + public void testSerDeWithoutNullFields() { + GenericRowSerializer serializer = new GenericRowSerializer(_fieldSpecs, false); + byte[] bytes = serializer.serialize(_row); + PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null); + dataBuffer.readFrom(0L, bytes); + GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, _fieldSpecs, false); + GenericRow reuse = new GenericRow(); + deserializer.deserialize(0L, reuse); + Map<String, Object> actualValueMap = reuse.getFieldToValueMap(); + Map<String, Object> expectedValueMap = _row.getFieldToValueMap(); + // NOTE: Cannot directly assert equals on maps because they contain arrays + assertEquals(actualValueMap.size(), expectedValueMap.size()); + for (Map.Entry<String, Object> entry : expectedValueMap.entrySet()) { + assertEquals(actualValueMap.get(entry.getKey()), entry.getValue()); + } + assertTrue(reuse.getNullValueFields().isEmpty()); + } + + @Test + public void testSerDeWithNullFields() { + GenericRowSerializer serializer = new GenericRowSerializer(_fieldSpecs, true); + byte[] bytes = serializer.serialize(_row); + PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null); + dataBuffer.readFrom(0L, bytes); + GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, _fieldSpecs, true); + GenericRow reuse = new GenericRow(); + deserializer.deserialize(0L, reuse); + assertEquals(reuse, _row); + } + + @Test + public void testSerDeWithPartialFields() { + List<FieldSpec> fieldSpecs = Arrays.asList(new DimensionFieldSpec("intSV", DataType.INT, true), + new DimensionFieldSpec("nullSV", DataType.INT, true)); + GenericRowSerializer serializer = new GenericRowSerializer(fieldSpecs, true); + byte[] bytes = serializer.serialize(_row); + PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null); + dataBuffer.readFrom(0L, bytes); + GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, fieldSpecs, true); + GenericRow reuse = new GenericRow(); + deserializer.deserialize(0L, reuse); + Map<String, Object> fieldToValueMap = reuse.getFieldToValueMap(); + assertEquals(fieldToValueMap.size(), 2); + assertEquals(fieldToValueMap.get("intSV"), _row.getValue("intSV")); + assertEquals(fieldToValueMap.get("nullSV"), _row.getValue("nullSV")); + Set<String> nullValueFields = reuse.getNullValueFields(); + assertEquals(nullValueFields, Collections.singleton("nullSV")); + } + + @Test + public void testPartialDeserialize() { + GenericRowSerializer serializer = new GenericRowSerializer(_fieldSpecs, true); + byte[] bytes = serializer.serialize(_row); + PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null); + dataBuffer.readFrom(0L, bytes); + GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, _fieldSpecs, true); + Object[] values = deserializer.partialDeserialize(0L, 7); + assertEquals(values[0], _row.getValue("intSV")); + assertEquals(values[1], _row.getValue("longSV")); + assertEquals(values[2], _row.getValue("floatSV")); + assertEquals(values[3], _row.getValue("doubleSV")); + assertEquals(values[4], _row.getValue("stringSV")); + assertEquals(values[5], _row.getValue("bytesSV")); + assertEquals(values[6], _row.getValue("nullSV")); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java index f4875ec..a88886c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java @@ -198,6 +198,13 @@ public class GenericRow implements Serializable { } /** + * Marks a field as {@code null}. + */ + public void addNullValueField(String fieldName) { + _nullValueFields.add(fieldName); + } + + /** * Removes all the fields from the row. */ public void clear() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org