This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b1c3e38da3 BugFix: Handle BYTES column type for partitioning. (#10110) b1c3e38da3 is described below commit b1c3e38da368b3aef543809edc3a4ddc5e44ac96 Author: Mayank Shrivastava <maya...@apache.org> AuthorDate: Wed Jan 11 13:38:49 2023 -0800 BugFix: Handle BYTES column type for partitioning. (#10110) The MurmurPartition function does not work with byte[] data type as it converts the value to String `(value.toString().getBytes(UTF_8)` first. In the offline flow, we already convert byte[] type to ByteArray for the MurmurPartition function to handle it correctly. This PR addresses the real-time ingestion path to also do the same conversion to ByteArray. --- .../local/indexsegment/mutable/MutableSegmentImpl.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index 456b95d275..0efa6395e5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -349,8 +349,8 @@ public class MutableSegmentImpl implements MutableSegment { // it is beyond the scope of realtime index pluggability to do this refactoring, so realtime // text indexes remain statically defined. Revisit this after this refactoring has been done. RealtimeLuceneTextIndex luceneTextIndex = - new RealtimeLuceneTextIndex(column, new File(config.getConsumerDir()), _segmentName, - stopWordsInclude, stopWordsExclude); + new RealtimeLuceneTextIndex(column, new File(config.getConsumerDir()), _segmentName, stopWordsInclude, + stopWordsExclude); if (_realtimeLuceneReaders == null) { _realtimeLuceneReaders = new RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName); } @@ -640,15 +640,20 @@ public class MutableSegmentImpl implements MutableSegment { // results, hence the extra care. A metric will already have been emitted when trying to update the dictionary. continue; } + FieldSpec fieldSpec = indexContainer._fieldSpec; + DataType dataType = fieldSpec.getDataType(); + if (fieldSpec.isSingleValueField()) { // Single-value column // Check partitions if (column.equals(_partitionColumn)) { - int partition = _partitionFunction.getPartition(value); + Object valueToPartition = (dataType == BYTES) ? new ByteArray((byte[]) value) : value; + int partition = _partitionFunction.getPartition(valueToPartition); if (indexContainer._partitions.add(partition)) { - _logger.warn("Found new partition: {} from partition column: {}, value: {}", partition, column, value); + _logger.warn("Found new partition: {} from partition column: {}, value: {}", partition, column, + valueToPartition); if (_serverMetrics != null) { _serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_PARTITION_MISMATCH, 1); } @@ -680,7 +685,6 @@ public class MutableSegmentImpl implements MutableSegment { // Single-value column with raw index // Update forward index - DataType dataType = fieldSpec.getDataType(); switch (dataType.getStoredType()) { case INT: forwardIndex.setInt(docId, (Integer) value); @@ -787,8 +791,6 @@ public class MutableSegmentImpl implements MutableSegment { } else { // Raw MV columns - // Update forward index and numValues info - DataType dataType = fieldSpec.getDataType(); switch (dataType.getStoredType()) { case INT: Object[] values = (Object[]) value; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org