Jackie-Jiang commented on a change in pull request #6899: URL: https://github.com/apache/incubator-pinot/pull/6899#discussion_r655602084
########## File path: pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java ########## @@ -1098,4 +1105,46 @@ public void testValidateUpsertConfig() { Assert.assertEquals(e.getMessage(), "The upsert table cannot have star-tree index."); } } + + @Test + public void testValidatePartialUpsertConfig() { + Schema schema = + new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1", FieldSpec.DataType.LONG) + .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING) + .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build(); + + Map<String, String> streamConfigs = new HashMap<>(); + streamConfigs.put("stream.kafka.consumer.type", "highLevel"); + streamConfigs.put("streamType", "kafka"); + streamConfigs.put("stream.kafka.topic.name", "test"); + streamConfigs + .put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"); + streamConfigs.put("stream.kafka.consumer.type", "simple"); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL)).setNullHandlingEnabled(false) + .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) + .setStreamConfigs(streamConfigs).build(); + try { + TableConfigUtils.validateUpsertConfig(tableConfig, schema); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "NullValueHandling is required to be enabled for partial upsert tables."); + } + + Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>(); + partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT); + try { + TableConfigUtils.validateUpsertConfig(tableConfig, schema); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied to PK."); + } + + partialUpsertStratgies = new HashMap<>(); + partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT); + try { + TableConfigUtils.validateUpsertConfig(tableConfig, schema); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), "INCREMENT merger should be numeric data types."); Review comment: Yes. IMO wait time should be long, instead of timestamp (milliseconds since epoch) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org