This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6dbd9e23fe Keeps nullness attributes of merged in comparison column values (#10704) 6dbd9e23fe is described below commit 6dbd9e23fef8d326fa2f2467ae8a10e2b3fdb428 Author: Evan Galpin <egal...@users.noreply.github.com> AuthorDate: Thu Jun 1 10:45:07 2023 -0700 Keeps nullness attributes of merged in comparison column values (#10704) --- .../segment/local/upsert/ComparisonColumns.java | 41 ++++- .../segment/local/upsert/PartialUpsertHandler.java | 7 +- .../pinot/segment/local/upsert/UpsertUtils.java | 10 +- .../mutable/MutableSegmentImplUpsertTest.java | 3 + .../local/upsert/ComparisonColumnsTest.java | 171 +++++++++++++++++++++ 5 files changed, 225 insertions(+), 7 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java index 5d40e5a350..174e441a20 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java @@ -23,6 +23,7 @@ package org.apache.pinot.segment.local.upsert; public class ComparisonColumns implements Comparable<ComparisonColumns> { private final Comparable[] _values; private final int _comparableIndex; + public static final int SEALED_SEGMENT_COMPARISON_INDEX = -1; public ComparisonColumns(Comparable[] values, int comparableIndex) { _values = values; @@ -37,10 +38,46 @@ public class ComparisonColumns implements Comparable<ComparisonColumns> { return _comparableIndex; } + public int compareToSealed(ComparisonColumns other) { + /* + - iterate over all columns + - if any value in _values is greater than its counterpart in _other._values, keep _values as-is and return 1 + - if any value in _values is less than its counterpart in _other._values, keep _values as-is and return -1 + - if all values between the two sets of Comparables are equal (compareTo == 0), keep _values as-is and return 0 + */ + for (int i = 0; i < _values.length; i++) { + Comparable comparisonValue = _values[i]; + Comparable otherComparisonValue = other.getValues()[i]; + if (comparisonValue == null && otherComparisonValue == null) { + continue; + } + + // Always keep the record with non-null value, or that with the greater comparisonResult + if (comparisonValue == null) { + // implies comparisonValue == null && otherComparisonValue != null + return -1; + } else if (otherComparisonValue == null) { + // implies comparisonValue != null && otherComparisonValue == null + return 1; + } else { + int comparisonResult = comparisonValue.compareTo(otherComparisonValue); + if (comparisonResult != 0) { + return comparisonResult; + } + } + } + return 0; + } + @Override public int compareTo(ComparisonColumns other) { - // _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility. - // There is no attempt to guarantee behavior in the case where there are multiple non-null values + if (_comparableIndex == SEALED_SEGMENT_COMPARISON_INDEX) { + return compareToSealed(other); + } + + // _comparisonColumns should only at most one non-null comparison value for newly ingested data. If not, it is + // the user's responsibility. There is no attempt to guarantee behavior in the case where there are multiple + // non-null values int comparisonResult; Comparable comparisonValue = _values[_comparableIndex]; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 86c268e509..483ec6d6bb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -74,7 +74,12 @@ public class PartialUpsertHandler { // comparison column values from the previous record, and the sole non-null comparison column value from // the new record. newRecord.putValue(column, previousRecord.getValue(column)); - newRecord.removeNullValueField(column); + if (!_comparisonColumns.contains(column)) { + // Despite wanting to overwrite the values to comparison columns from prior records, we want to + // preserve for _this_ record which comparison column was non-null. Doing so will allow us to + // re-evaluate the same comparisons when reading a segment and during steady-state stream ingestion + newRecord.removeNullValueField(column); + } } else if (!_comparisonColumns.contains(column)) { PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); newRecord.putValue(column, diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java index acda88a27b..6a7d3ed626 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java @@ -180,6 +180,7 @@ public class UpsertUtils { public MultiComparisonColumnReader(IndexSegment segment, List<String> comparisonColumns) { _comparisonColumnReaders = new PinotSegmentColumnReader[comparisonColumns.size()]; + for (int i = 0; i < comparisonColumns.size(); i++) { _comparisonColumnReaders[i] = new PinotSegmentColumnReader(segment, comparisonColumns.get(i)); } @@ -190,13 +191,14 @@ public class UpsertUtils { for (int i = 0; i < _comparisonColumnReaders.length; i++) { PinotSegmentColumnReader columnReader = _comparisonColumnReaders[i]; - Comparable comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId); + Comparable comparisonValue = null; + if (!columnReader.isNull(docId)) { + comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId); + } comparisonColumns[i] = comparisonValue; } - // Note that the comparable index is negative here to indicate that this instance could be the argument to - // ComparisonColumns#compareTo, but should never call compareTo itself. - return new ComparisonColumns(comparisonColumns, -1); + return new ComparisonColumns(comparisonColumns, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX); } @Override diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java index c8b38db61e..1138d891c5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java @@ -76,6 +76,7 @@ public class MutableSegmentImplUpsertTest { _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash) + .setNullHandlingEnabled(true) .build(); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); @@ -138,6 +139,7 @@ public class MutableSegmentImplUpsertTest { // Confirm that both comparison column values have made it into the persisted upserted doc Assert.assertEquals(1567205397L, _mutableSegmentImpl.getValue(2, "secondsSinceEpoch")); Assert.assertEquals(1567205395L, _mutableSegmentImpl.getValue(2, "otherComparisonColumn")); + Assert.assertTrue(_mutableSegmentImpl.getDataSource("secondsSinceEpoch").getNullValueVector().isNull(2)); // bb Assert.assertFalse(bitmap.contains(4)); @@ -146,6 +148,7 @@ public class MutableSegmentImplUpsertTest { // Confirm that comparison column values have made it into the persisted upserted doc Assert.assertEquals(1567205396L, _mutableSegmentImpl.getValue(5, "secondsSinceEpoch")); Assert.assertEquals(Long.MIN_VALUE, _mutableSegmentImpl.getValue(5, "otherComparisonColumn")); + Assert.assertTrue(_mutableSegmentImpl.getDataSource("otherComparisonColumn").getNullValueVector().isNull(5)); } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ComparisonColumnsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ComparisonColumnsTest.java new file mode 100644 index 0000000000..2fae498b44 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ComparisonColumnsTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.util.Arrays; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class ComparisonColumnsTest { + private void nullFill(Comparable[]... comparables) { + for (Comparable[] comps : comparables) { + Arrays.fill(comps, null); + } + } + + @Test + public void testRealtimeComparison() { + Comparable[] newComparables = new Comparable[3]; + Comparable[] persistedComparables = new Comparable[3]; + ComparisonColumns alreadyPersisted = new ComparisonColumns(persistedComparables, 0); + ComparisonColumns toBeIngested = new ComparisonColumns(newComparables, 0); + + // reject same col with smaller value + newComparables[0] = 1; + persistedComparables[0] = 2; + int comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, -1); + + // persist same col with equal value + nullFill(newComparables, persistedComparables); + newComparables[0] = 2; + persistedComparables[0] = 2; + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, 0); + + // persist same col with larger value + nullFill(newComparables, persistedComparables); + newComparables[0] = 2; + persistedComparables[0] = 1; + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, 1); + Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, null}); + + // persist doc with col which was previously null, even though its value is smaller than the previous non-null col + nullFill(newComparables, persistedComparables); + toBeIngested = new ComparisonColumns(newComparables, newComparables.length - 1); + newComparables[newComparables.length - 1] = 1; + persistedComparables[0] = 2; + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, 1); + Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, 1}); + + // persist new doc where existing doc has multiple non-null comparison values + nullFill(newComparables, persistedComparables); + toBeIngested = new ComparisonColumns(newComparables, 1); + newComparables[1] = 2; + Arrays.fill(persistedComparables, 1); + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, 1); + Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{1, 2, 1}); + + // reject new doc where existing doc has multiple non-null comparison values + nullFill(newComparables, persistedComparables); + newComparables[1] = 0; + Arrays.fill(persistedComparables, 1); + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, -1); + } + + @Test + public void testSealedComparison() { + // Remember to be cognizant of which scenarios are _actually_ possible in a sealed segment. The way in which docs + // are compared during realtime ingestion dictates the possible scenarios of persisted rows. Ex. it is not + // possible for 2 docs with the same primary key to have a mutually exclusive set of non-null values; if such a + // scenario arose during realtime ingestion, the values would be merged such that the newly persisted doc would + // have all non-null comparison values. We should avoid making tests pass for scenarios that are not intended to + // be supported. + Comparable[] newComparables = new Comparable[3]; + Comparable[] persistedComparables = new Comparable[3]; + ComparisonColumns alreadyPersisted = + new ComparisonColumns(persistedComparables, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX); + ComparisonColumns toBeIngested = + new ComparisonColumns(newComparables, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX); + + // reject same col with smaller value + newComparables[0] = 1; + persistedComparables[0] = 2; + int comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, -1); + + // persist same col with equal value + nullFill(newComparables, persistedComparables); + newComparables[0] = 2; + persistedComparables[0] = 2; + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, 0); + // Verify unchanged comparables in the case of SEALED comparison + Assert.assertEquals(toBeIngested.getValues(), newComparables); + + // persist same col with larger value + nullFill(newComparables, persistedComparables); + newComparables[0] = 2; + persistedComparables[0] = 1; + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, 1); + + // reject doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has 2 + // null columns. The presence of null columns in one of the docs implies that it must have come before the doc + // with non-null columns. + nullFill(newComparables, persistedComparables); + newComparables[1] = 1; + persistedComparables[0] = 1; + persistedComparables[2] = 1; + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, -1); + + // persist doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has + nullFill(newComparables, persistedComparables); + newComparables[0] = 1; + newComparables[2] = 2; + persistedComparables[0] = 1; + persistedComparables[2] = 1; + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, 1); + + // persist doc with non-null value where existing doc had null value in same column previously (but multiple + // non-null in other columns) + nullFill(newComparables, persistedComparables); + newComparables[0] = 1; + newComparables[1] = 1; + newComparables[2] = 1; + persistedComparables[0] = 1; + persistedComparables[2] = 1; + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, 1); + + // reject doc where existing doc has all non-null comparison values, but _this_ doc has 2 null values. + // The presence of null columns in one of the docs implies that it must have come before the doc with non-null + // columns. + nullFill(newComparables, persistedComparables); + newComparables[1] = 1; + Arrays.fill(persistedComparables, 1); + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, -1); + + // Persist doc where existing doc has all non-null comparison values, but _this_ doc has a larger value. + nullFill(newComparables, persistedComparables); + Arrays.fill(newComparables, 1); + Arrays.fill(persistedComparables, 1); + newComparables[1] = 2; + comparisonResult = toBeIngested.compareTo(alreadyPersisted); + Assert.assertEquals(comparisonResult, 1); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org