This is an automated email from the ASF dual-hosted git repository. yupeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d317c59 Add global strategy for partial upsert (#7906) d317c59 is described below commit d317c5909454a3732716f24008d2bcf96305ef90 Author: deemoliu <qiao...@uber.com> AuthorDate: Wed Jan 19 16:44:09 2022 -0800 Add global strategy for partial upsert (#7906) * Add global strategy for partial upsert * fix UT setup * try fix lint * fix tests * handle empty globalUpsertStrategy * update defaultValue for full upsert to be null * update _globalUpsertStrategy to _defaultPartialUpsertStrategy * try fix lint * fix checkstyle * add taskConfig test setup code * include all physical columns (including date time columns) except for primary key columns and comparison column * fix partial upsert handler merge tests * Annotate comparison column as nullable, use main time column * simplified partialUpsertHandler (comparison column is non-null) * fix checkstyle --- .../common/utils/config/TableConfigSerDeTest.java | 9 +- .../controller/helix/PinotResourceManagerTest.java | 2 +- .../manager/realtime/RealtimeTableDataManager.java | 9 +- .../tests/BaseClusterIntegrationTest.java | 2 +- .../segment/local/upsert/PartialUpsertHandler.java | 14 ++- .../MutableSegmentImplUpsertComparisonColTest.java | 4 +- .../mutable/MutableSegmentImplUpsertTest.java | 4 +- .../local/upsert/PartialUpsertHandlerTest.java | 71 ++++++++++++- .../segment/local/utils/TableConfigUtilsTest.java | 110 ++++++++++----------- .../pinot/spi/config/table/UpsertConfig.java | 11 +++ .../pinot/spi/config/table/UpsertConfigTest.java | 11 ++- 11 files changed, 170 insertions(+), 77 deletions(-) diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index 17ce0dd..164b978 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -228,10 +228,8 @@ public class TableConfigSerDeTest { Map<String, String> properties = new HashMap<>(); properties.put("foo", "bar"); properties.put("foobar", "potato"); - List<FieldConfig> fieldConfigList = Arrays.asList( - new FieldConfig("column1", FieldConfig.EncodingType.DICTIONARY, Lists.newArrayList( - FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, - properties), + List<FieldConfig> fieldConfigList = Arrays.asList(new FieldConfig("column1", FieldConfig.EncodingType.DICTIONARY, + Lists.newArrayList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, properties), new FieldConfig("column2", null, Collections.emptyList(), null, null), new FieldConfig("column3", FieldConfig.EncodingType.RAW, Collections.emptyList(), FieldConfig.CompressionCodec.SNAPPY, null)); @@ -251,7 +249,8 @@ public class TableConfigSerDeTest { { // with upsert config UpsertConfig upsertConfig = - new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", UpsertConfig.HashFunction.NONE); + new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", + UpsertConfig.HashFunction.NONE); TableConfig tableConfig = tableConfigBuilder.setUpsertConfig(upsertConfig).build(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java index a0938d3..56cc48b 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java @@ -68,7 +68,7 @@ public class PinotResourceManagerTest { realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING); realtimeTableConfig.getValidationConfig() .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)); - realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)); + realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)); ControllerTestUtils.getHelixResourceManager().addTable(realtimeTableConfig); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index b74a9d7..7ca8275 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -164,8 +164,13 @@ public class RealtimeTableDataManager extends BaseTableDataManager { PartialUpsertHandler partialUpsertHandler = null; if (isPartialUpsertEnabled()) { - partialUpsertHandler = - new PartialUpsertHandler(_helixManager, _tableNameWithType, upsertConfig.getPartialUpsertStrategies()); + String comparisonColumn = upsertConfig.getComparisonColumn(); + if (comparisonColumn == null) { + comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName(); + } + partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, schema, + upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(), + comparisonColumn); } UpsertConfig.HashFunction hashFunction = upsertConfig.getHashFunction(); _tableUpsertMetadataManager = diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index b98c7c4..a453fdc 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -386,7 +386,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap)) .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1)) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build(); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 7bc6e6a..9a23b73 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -29,6 +29,7 @@ import org.apache.helix.model.LiveInstance; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.slf4j.Logger; @@ -48,13 +49,22 @@ public class PartialUpsertHandler { private final String _tableNameWithType; private boolean _allSegmentsLoaded; - public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType, - Map<String, UpsertConfig.Strategy> partialUpsertStrategies) { + public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType, Schema schema, + Map<String, UpsertConfig.Strategy> partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy, + String comparisonColumn) { _helixManager = helixManager; _tableNameWithType = tableNameWithType; for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); } + // For all physical columns (including date time columns) except for primary key columns and comparison column. + // If no comparison column is configured, use main time column as the comparison time. + for (String columnName : schema.getPhysicalColumnNames()) { + if (!schema.getPrimaryKeyColumns().contains(columnName) && !_column2Mergers.containsKey(columnName) + && !comparisonColumn.equals(columnName)) { + _column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy)); + } + } } /** diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java index a65e7a9..57f1c83 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java @@ -57,7 +57,7 @@ public class MutableSegmentImplUpsertComparisonColTest { URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH); _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable") - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset", null)).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "offset", null)).build(); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); _partitionUpsertMetadataManager = @@ -65,7 +65,7 @@ public class MutableSegmentImplUpsertComparisonColTest { UpsertConfig.HashFunction.NONE).getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), - false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset", null), "secondsSinceEpoch", + false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "offset", null), "secondsSinceEpoch", _partitionUpsertMetadataManager); GenericRow reuse = new GenericRow(); try (RecordReader recordReader = RecordReaderFactory diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java index fc8b230..0bcbb01 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java @@ -55,7 +55,7 @@ public class MutableSegmentImplUpsertTest { URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH); _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable") - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, hashFunction)).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, hashFunction)).build(); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); _partitionUpsertMetadataManager = @@ -63,7 +63,7 @@ public class MutableSegmentImplUpsertTest { .getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), - false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, hashFunction), "secondsSinceEpoch", + false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, hashFunction), "secondsSinceEpoch", _partitionUpsertMetadataManager); GenericRow reuse = new GenericRow(); try (RecordReader recordReader = RecordReaderFactory diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index 5335d51..471a46c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -18,10 +18,13 @@ */ package org.apache.pinot.segment.local.upsert; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import org.apache.helix.HelixManager; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -36,10 +39,18 @@ public class PartialUpsertHandlerTest { @Test public void testMerge() { HelixManager helixManager = Mockito.mock(HelixManager.class); + + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) + .addSingleValueDimension("field1", FieldSpec.DataType.LONG) + .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") + .setPrimaryKeyColumns(Arrays.asList("pk")).build(); + String realtimeTableName = "testTable_REALTIME"; Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>(); partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT); - PartialUpsertHandler handler = new PartialUpsertHandler(helixManager, realtimeTableName, partialUpsertStrategies); + PartialUpsertHandler handler = + new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies, + UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch"); // both records are null. GenericRow previousRecord = new GenericRow(); @@ -61,13 +72,18 @@ public class PartialUpsertHandlerTest { assertEquals(newRecord.getValue("field1"), 2); // newRecord is default null value, while previousRecord is not. + // field1 should not be incremented since the newRecord is null. + // special case: field2 should be overrided by null value because we didn't enabled default partial upsert strategy. previousRecord.clear(); incomingRecord.clear(); previousRecord.putValue("field1", 1); + previousRecord.putValue("field2", 2); incomingRecord.putDefaultNullValue("field1", 2); + incomingRecord.putDefaultNullValue("field2", 0); newRecord = handler.merge(previousRecord, incomingRecord); assertFalse(newRecord.isNullValue("field1")); assertEquals(newRecord.getValue("field1"), 1); + assertTrue(newRecord.isNullValue("field2")); // neither of records is null. previousRecord.clear(); @@ -78,4 +94,57 @@ public class PartialUpsertHandlerTest { assertFalse(newRecord.isNullValue("field1")); assertEquals(newRecord.getValue("field1"), 3); } + + @Test + public void testMergeWithDefaultPartialUpsertStrategy() { + HelixManager helixManager = Mockito.mock(HelixManager.class); + + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) + .addSingleValueDimension("field1", FieldSpec.DataType.LONG).addMetric("field2", FieldSpec.DataType.LONG) + .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") + .setPrimaryKeyColumns(Arrays.asList("pk")).build(); + + String realtimeTableName = "testTable_REALTIME"; + Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>(); + partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT); + PartialUpsertHandler handler = + new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies, + UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch"); + + // previousRecord is null default value, while newRecord is not. + GenericRow previousRecord = new GenericRow(); + GenericRow incomingRecord = new GenericRow(); + previousRecord.putDefaultNullValue("field1", 1); + previousRecord.putDefaultNullValue("field2", 2); + incomingRecord.putValue("field1", 2); + incomingRecord.putValue("field2", 1); + GenericRow newRecord = handler.merge(previousRecord, incomingRecord); + assertFalse(newRecord.isNullValue("field1")); + assertEquals(newRecord.getValue("field1"), 2); + assertEquals(newRecord.getValue("field2"), 1); + + // newRecord is default null value, while previousRecord is not. + // field1 should not be incremented since the newRecord is null. + // field2 should not be overrided by null value since we have default partial upsert strategy. + previousRecord.clear(); + incomingRecord.clear(); + previousRecord.putValue("field1", 8); + previousRecord.putValue("field2", 8); + incomingRecord.putDefaultNullValue("field1", 1); + incomingRecord.putDefaultNullValue("field2", 0); + newRecord = handler.merge(previousRecord, incomingRecord); + assertEquals(newRecord.getValue("field1"), 8); + assertEquals(newRecord.getValue("field2"), 8); + + // neither of records is null. + previousRecord.clear(); + incomingRecord.clear(); + previousRecord.putValue("field1", 1); + previousRecord.putValue("field2", 100); + incomingRecord.putValue("field1", 2); + incomingRecord.putValue("field2", 1000); + newRecord = handler.merge(previousRecord, incomingRecord); + assertEquals(newRecord.getValue("field1"), 3); + assertEquals(newRecord.getValue("field2"), 1000); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index eacb587..928a01a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -357,8 +357,7 @@ public class TableConfigUtilsTest { // input field name used as destination field tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( - new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(myCol)")), - null)) + new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(myCol)")), null)) .build(); try { TableConfigUtils.validate(tableConfig, schema); @@ -505,14 +504,14 @@ public class TableConfigUtilsTest { TableConfigUtils.validate(tableConfig, schema); // 1 tier configs - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null))).build(); TableConfigUtils.validate(tableConfig, schema); // 2 tier configs, case insensitive check - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "30d", + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d", TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(), "tier2_tag_OFFLINE", null, null))).build(); @@ -527,8 +526,8 @@ public class TableConfigUtilsTest { TableConfigUtils.validate(tableConfig, schema); // tier name empty - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList( + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList( new TierConfig("", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null))).build(); try { @@ -539,8 +538,8 @@ public class TableConfigUtilsTest { } // tier name repeats - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList(new TierConfig("sameTierName", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList(new TierConfig("sameTierName", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null), new TierConfig("sameTierName", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "100d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build(); @@ -552,8 +551,8 @@ public class TableConfigUtilsTest { } // segmentSelectorType invalid - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", "unsupportedSegmentSelector", "40d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build(); @@ -565,8 +564,8 @@ public class TableConfigUtilsTest { } // segmentAge not provided for TIME segmentSelectorType - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList( + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList( new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, null, TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build(); @@ -578,8 +577,8 @@ public class TableConfigUtilsTest { } // segmentAge invalid - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "3600", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build(); @@ -592,11 +591,10 @@ public class TableConfigUtilsTest { } // storageType invalid - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList( - new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", "unsupportedStorageType", - "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d", - TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build(); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", "unsupportedStorageType", + "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d", + TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build(); try { TableConfigUtils.validate(tableConfig, schema); @@ -606,8 +604,8 @@ public class TableConfigUtilsTest { } // serverTag not provided for PINOT_SERVER storageType - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d", TierFactory.PINOT_SERVER_STORAGE_TYPE, null, null, null))).build(); @@ -619,8 +617,8 @@ public class TableConfigUtilsTest { } // serverTag invalid - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList( - Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists + .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d", TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build(); @@ -1046,7 +1044,7 @@ public class TableConfigUtilsTest { new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) .build(); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build(); try { TableConfigUtils.validateUpsertConfig(tableConfig, schema); Assert.fail(); @@ -1055,7 +1053,7 @@ public class TableConfigUtilsTest { } tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build(); try { TableConfigUtils.validateUpsertConfig(tableConfig, schema); Assert.fail(); @@ -1070,14 +1068,14 @@ public class TableConfigUtilsTest { TableConfigUtils.validateUpsertConfig(tableConfig, schema); Assert.fail(); } catch (IllegalStateException e) { - Assert.assertEquals(e.getMessage(), - "Could not find streamConfigs for REALTIME table: " + TABLE_NAME + "_REALTIME"); + Assert + .assertEquals(e.getMessage(), "Could not find streamConfigs for REALTIME table: " + TABLE_NAME + "_REALTIME"); } Map<String, String> streamConfigs = getStreamConfigs(); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setStreamConfigs(streamConfigs) - .build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)) + .setStreamConfigs(streamConfigs).build(); try { TableConfigUtils.validateUpsertConfig(tableConfig, schema); Assert.fail(); @@ -1087,8 +1085,8 @@ public class TableConfigUtilsTest { streamConfigs.put("stream.kafka.consumer.type", "simple"); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setStreamConfigs(streamConfigs) - .build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)) + .setStreamConfigs(streamConfigs).build(); try { TableConfigUtils.validateUpsertConfig(tableConfig, schema); Assert.fail(); @@ -1098,16 +1096,15 @@ public class TableConfigUtilsTest { } tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)) .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) .setStreamConfigs(streamConfigs).build(); TableConfigUtils.validateUpsertConfig(tableConfig, schema); - StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Lists.newArrayList("myCol"), null, - Collections.singletonList( - new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "myCol").toColumnName()), 10); + StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Lists.newArrayList("myCol"), null, Collections + .singletonList(new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "myCol").toColumnName()), 10); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)) .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) .setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build(); try { @@ -1131,9 +1128,9 @@ public class TableConfigUtilsTest { Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>(); partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT); - TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null, null)) - .setNullHandlingEnabled(false) + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig( + new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null, + null)).setNullHandlingEnabled(false) .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) .setStreamConfigs(streamConfigs).build(); try { @@ -1191,18 +1188,17 @@ public class TableConfigUtilsTest { @Test public void testTaskConfig() { Schema schema = - new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) - .addSingleValueDimension("myCol", FieldSpec.DataType.STRING) + new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") - .setPrimaryKeyColumns(Lists.newArrayList("myCol")) - .build(); - Map<String, String> realtimeToOfflineTaskConfig = - ImmutableMap.of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", "6h", "bufferTimePeriod", "5d", "mergeType", - "rollup", "myCol.aggregationType", "max"); + .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build(); + Map<String, String> realtimeToOfflineTaskConfig = ImmutableMap + .of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", "6h", "bufferTimePeriod", "5d", "mergeType", "rollup", + "myCol.aggregationType", "max"); Map<String, String> segmentGenerationAndPushTaskConfig = ImmutableMap.of("schedule", "0 */10 * ? * * *"); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig( - new TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, - "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); + new TableTaskConfig(ImmutableMap + .of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask", + segmentGenerationAndPushTaskConfig))).build(); // validate valid config TableConfigUtils.validateTaskConfigs(tableConfig, schema); @@ -1221,11 +1217,11 @@ public class TableConfigUtilsTest { } // invalid Upsert config with RealtimeToOfflineTask - tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setTimeColumnName(TIME_COLUMN) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, - "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); + tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).setTaskConfig( + new TableTaskConfig(ImmutableMap + .of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask", + segmentGenerationAndPushTaskConfig))).build(); try { TableConfigUtils.validateTaskConfigs(tableConfig, schema); Assert.fail(); @@ -1293,8 +1289,8 @@ public class TableConfigUtilsTest { streamConfigs.put("streamType", "kafka"); streamConfigs.put("stream.kafka.consumer.type", "highLevel"); streamConfigs.put("stream.kafka.topic.name", "test"); - streamConfigs.put("stream.kafka.decoder.class.name", - "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"); + streamConfigs + .put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"); return streamConfigs; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 9184bff..5afbe9c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -53,12 +53,16 @@ public class UpsertConfig extends BaseJsonConfig { @JsonPropertyDescription("Partial update strategies.") private final Map<String, Strategy> _partialUpsertStrategies; + @JsonPropertyDescription("default upsert strategy for partial mode") + private final Strategy _defaultPartialUpsertStrategy; + @JsonPropertyDescription("Column for upsert comparison, default to time column") private final String _comparisonColumn; @JsonCreator public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode, @JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> partialUpsertStrategies, + @JsonProperty("defaultPartialUpsertStrategy") @Nullable Strategy defaultPartialUpsertStrategy, @JsonProperty("comparisonColumn") @Nullable String comparisonColumn, @JsonProperty("hashFunction") @Nullable HashFunction hashFunction) { Preconditions.checkArgument(mode != null, "Upsert mode must be configured"); @@ -66,8 +70,11 @@ public class UpsertConfig extends BaseJsonConfig { if (mode == Mode.PARTIAL) { _partialUpsertStrategies = partialUpsertStrategies != null ? partialUpsertStrategies : new HashMap<>(); + _defaultPartialUpsertStrategy = + defaultPartialUpsertStrategy != null ? defaultPartialUpsertStrategy : Strategy.OVERWRITE; } else { _partialUpsertStrategies = null; + _defaultPartialUpsertStrategy = null; } _comparisonColumn = comparisonColumn; @@ -87,6 +94,10 @@ public class UpsertConfig extends BaseJsonConfig { return _partialUpsertStrategies; } + public Strategy getDefaultPartialUpsertStrategy() { + return _defaultPartialUpsertStrategy; + } + public String getComparisonColumn() { return _comparisonColumn; } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java index 37b18a2..d7d53d2 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java @@ -29,18 +29,21 @@ public class UpsertConfigTest { @Test public void testUpsertConfig() { - UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null); + UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null); assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL); - upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", null); + upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", null); assertEquals(upsertConfig1.getComparisonColumn(), "comparison"); - upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", UpsertConfig.HashFunction.MURMUR3); + upsertConfig1 = + new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", UpsertConfig.HashFunction.MURMUR3); assertEquals(upsertConfig1.getHashFunction(), UpsertConfig.HashFunction.MURMUR3); Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>(); partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT); - UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null, null); + UpsertConfig upsertConfig2 = + new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null, + null); assertEquals(upsertConfig2.getPartialUpsertStrategies(), partialUpsertStratgies); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org