deemoliu commented on a change in pull request #6899:
URL: https://github.com/apache/incubator-pinot/pull/6899#discussion_r650307998



##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+import org.apache.pinot.spi.config.table.UpsertConfig;
+
+
+public class PartialUpsertMergerFactory {
+  private PartialUpsertMergerFactory() {
+  }
+
+  private static final OverwriteMerger OVERWRITE_MERGER = new 
OverwriteMerger();
+  private static final IncrementMerger INCREMENT_MERGER = new 
IncrementMerger();
+
+  public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
+    switch (strategy) {
+      case OVERWRITE:
+        return OVERWRITE_MERGER;
+      case INCREMENT:
+        return INCREMENT_MERGER;

Review comment:
       hi @yupeng9 i added one merger tests here for null values 
https://github.com/apache/incubator-pinot/pull/6899/commits/ec409cb739ffbe42b424003596cf9443a31d8ebf

##########
File path: 
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
##########
@@ -30,16 +32,37 @@
     FULL, PARTIAL, NONE
   }
 
+  public enum Strategy {
+    OVERWRITE, INCREMENT

Review comment:
       gotcha resolved in 
https://github.com/apache/incubator-pinot/pull/6899/commits/61356a46850d3c0eeb9313845ec77d10769486ba

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -350,6 +357,36 @@ public static void validateUpsertConfig(TableConfig 
tableConfig, Schema schema)
             .getIndexingConfig().isEnableDefaultStarTree(), "The upsert table 
cannot have star-tree index.");
   }
 
+  /**
+   * Validates the partial upsert-related configurations
+   *  - INCREMENT merger cannot be applied to PK.
+   *  - INCREMENT merger should be numeric data types.
+   *  - enforce nullValueHandling for partial upsert tables.
+   */
+  private static void validatePartialUpsertStrategies(Schema schema, 
TableConfig tableConfig) {
+    if (tableConfig.getUpsertMode() != UpsertConfig.Mode.PARTIAL) {
+      return;
+    }
+
+    
Preconditions.checkState(tableConfig.getIndexingConfig().isNullHandlingEnabled(),
+        "NullValueHandling is required to be enabled for partial upsert 
tables.");
+
+    Map<String, UpsertConfig.Strategy> partialUpsertStrategies =
+        tableConfig.getUpsertConfig().getPartialUpsertStrategies();
+
+    for (Map.Entry<String, UpsertConfig.Strategy> entry : 
partialUpsertStrategies.entrySet()) {
+      Set<FieldSpec.DataType> numericsDataType = new 
HashSet<>(Arrays.asList(INT, LONG, FLOAT, DOUBLE));

Review comment:
       thanks @yupeng9 ,  I added one more validation rules to avoid date time 
field with numeric datatype get "incremented". ​
   
https://github.com/apache/incubator-pinot/pull/6899/commits/372e8e639b18418917f0e965cc9b6e2f189c7c02
   

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -984,17 +989,19 @@ public void testValidateIndexingConfig() {
       // Expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setVarLengthDictionaryColumns(Arrays.asList("intCol")).
-        build();
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setVarLengthDictionaryColumns(Arrays.asList("intCol")).

Review comment:
       Thanks @yupeng9 @Jackie-Jiang, my IDE formatter might not be defined 
correctly. Is there any guide on this?

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -1098,4 +1105,46 @@ public void testValidateUpsertConfig() {
       Assert.assertEquals(e.getMessage(), "The upsert table cannot have 
star-tree index.");
     }
   }
+
+  @Test
+  public void testValidatePartialUpsertConfig() {
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1",
 FieldSpec.DataType.LONG)
+            .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING)
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build();
+
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+    streamConfigs.put("streamType", "kafka");
+    streamConfigs.put("stream.kafka.topic.name", "test");
+    streamConfigs
+        .put("stream.kafka.decoder.class.name", 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+    streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.PARTIAL)).setNullHandlingEnabled(false)
+        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setStreamConfigs(streamConfigs).build();
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "NullValueHandling is required to be 
enabled for partial upsert tables.");
+    }
+
+    Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new 
HashMap<>();
+    partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied 
to PK.");
+    }
+
+    partialUpsertStratgies = new HashMap<>();
+    partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger should be numeric 
data types.");

Review comment:
       Thanks @yupeng9 for pointing this out. I don't see a use case that 
requires increment on timestamp for now. Please let me know there is any use 
case require this.
   Since time type can be Numeric Datatype, I added one more validation rules 
to avoid Numeric timestamp value get incremented.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to