This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new cf71498 Adjust schema validation logic in AvroIngestionSchemaValidator (#6009) cf71498 is described below commit cf71498478f120fdc182c4aa34d156232076cd18 Author: Jialiang Li <j...@linkedin.com> AuthorDate: Wed Sep 16 09:44:39 2020 -0700 Adjust schema validation logic in AvroIngestionSchemaValidator (#6009) * Adjust schema validation logic in AvroIngestionSchemaValidator * Reduce test file sizes Co-authored-by: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> --- .../hadoop/data/IngestionSchemaValidatorTest.java | 51 +++++++++++++------ .../src/test/resources/data/test_sample_data.avro | Bin 917973 -> 2315 bytes .../data/test_sample_data_multi_value.avro | Bin 0 -> 5108 bytes .../avro/AvroIngestionSchemaValidator.java | 54 +++++++++++++++------ 4 files changed, 73 insertions(+), 32 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java index fec3583..8cd0912 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java @@ -29,16 +29,16 @@ import org.testng.annotations.Test; public class IngestionSchemaValidatorTest { + @Test - public void testAvroIngestionSchemaValidator() + public void testAvroIngestionSchemaValidatorForSingleValueColumns() throws Exception { - String inputFilePath = new File( - Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro")) - .getFile()).toString(); + String inputFilePath = new File(Preconditions + .checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro")) + .getFile()).toString(); String recordReaderClassName = "org.apache.pinot.plugin.inputformat.avro.AvroRecordReader"; - Schema pinotSchema = new Schema.SchemaBuilder() - .addSingleValueDimension("column1", FieldSpec.DataType.LONG) + Schema pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.LONG) .addSingleValueDimension("column2", FieldSpec.DataType.INT) .addSingleValueDimension("column3", FieldSpec.DataType.STRING) .addSingleValueDimension("column7", FieldSpec.DataType.STRING) @@ -53,8 +53,7 @@ public class IngestionSchemaValidatorTest { Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); // Adding one extra column - pinotSchema = new Schema.SchemaBuilder() - .addSingleValueDimension("column1", FieldSpec.DataType.LONG) + pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.LONG) .addSingleValueDimension("column2", FieldSpec.DataType.INT) .addSingleValueDimension("column3", FieldSpec.DataType.STRING) .addSingleValueDimension("extra_column", FieldSpec.DataType.STRING) @@ -69,11 +68,9 @@ public class IngestionSchemaValidatorTest { Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); Assert.assertNotNull(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason()); - System.out.println(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason()); // Change the data type of column1 from LONG to STRING - pinotSchema = new Schema.SchemaBuilder() - .addSingleValueDimension("column1", FieldSpec.DataType.STRING) + pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.STRING) .addSingleValueDimension("column2", FieldSpec.DataType.INT) .addSingleValueDimension("column3", FieldSpec.DataType.STRING) .addSingleValueDimension("column7", FieldSpec.DataType.STRING) @@ -83,14 +80,12 @@ public class IngestionSchemaValidatorTest { Assert.assertNotNull(ingestionSchemaValidator); Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()); Assert.assertNotNull(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason()); - System.out.println(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason()); Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); // Change column2 from single-value column to multi-value column - pinotSchema = new Schema.SchemaBuilder() - .addSingleValueDimension("column1", FieldSpec.DataType.LONG) + pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.LONG) .addMultiValueDimension("column2", FieldSpec.DataType.INT) .addSingleValueDimension("column3", FieldSpec.DataType.STRING) .addSingleValueDimension("column7", FieldSpec.DataType.STRING) @@ -101,11 +96,35 @@ public class IngestionSchemaValidatorTest { Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()); Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); Assert.assertNotNull(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason()); - System.out.println(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason()); Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); Assert.assertNotNull(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason()); - System.out.println(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason()); Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); + } + + @Test + public void testAvroIngestionValidatorForMultiValueColumns() + throws Exception { + String inputFilePath = new File(Preconditions.checkNotNull( + IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data_multi_value.avro")) + .getFile()).toString(); + String recordReaderClassName = "org.apache.pinot.plugin.inputformat.avro.AvroRecordReader"; + + // column 2 is of int type in the AVRO. + // column3 and column16 are both of array of map structure. + // metric_not_found doesn't exist in input AVRO + Schema pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.STRING) + .addSingleValueDimension("column2", FieldSpec.DataType.LONG) + .addSingleValueDimension("column3", FieldSpec.DataType.STRING) + .addMultiValueDimension("column16", FieldSpec.DataType.STRING) + .addMetric("metric_not_found", FieldSpec.DataType.LONG) + .addMetric("metric_nus_impressions", FieldSpec.DataType.LONG).build(); + IngestionSchemaValidator ingestionSchemaValidator = + SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath); + Assert.assertNotNull(ingestionSchemaValidator); + Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()); + Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); + Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); + Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data.avro b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data.avro index d2e6404..6053058 100644 Binary files a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data.avro and b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data.avro differ diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro new file mode 100644 index 0000000..bbbb73a Binary files /dev/null and b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro differ diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java index d0ee84f..429cdf2 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java @@ -97,6 +97,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator { fieldSpec.getDataType().name(), getInputSchemaType())); continue; } + String avroColumnName = avroColumnField.schema().getName(); org.apache.avro.Schema avroColumnSchema = avroColumnField.schema(); org.apache.avro.Schema.Type avroColumnType = avroColumnSchema.getType(); if (avroColumnType == org.apache.avro.Schema.Type.UNION) { @@ -111,36 +112,57 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator { } } if (nonNullSchema != null) { + avroColumnSchema = nonNullSchema; avroColumnType = nonNullSchema.getType(); } } - if (!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) { - _dataTypeMismatch.addMismatchReason(String - .format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.", columnName, - fieldSpec.getDataType().name(), avroColumnSchema.getName(), avroColumnType.toString(), - getInputSchemaType())); - } - if (fieldSpec.isSingleValueField()) { + // check single-value multi-value mismatch if (avroColumnType.ordinal() < org.apache.avro.Schema.Type.STRING.ordinal()) { - // the column is a complex structure - _singleValueMultiValueFieldMismatch.addMismatchReason(String.format( - "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.", - columnName, avroColumnSchema.getName(), getInputSchemaType())); + _singleValueMultiValueFieldMismatch.addMismatchReason(String + .format( + "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.", + columnName, avroColumnName, getInputSchemaType())); + } + FieldSpec.DataType dataTypeForSVColumn = AvroUtils.extractFieldDataType(avroColumnField); + // check data type mismatch + if (fieldSpec.getDataType() != dataTypeForSVColumn) { + _dataTypeMismatch.addMismatchReason(String + .format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.", columnName, + fieldSpec.getDataType().name(), avroColumnName, avroColumnType.name(), + getInputSchemaType())); } } else { + // check single-value multi-value mismatch if (avroColumnType.ordinal() >= org.apache.avro.Schema.Type.STRING.ordinal()) { - // the column is a complex structure - _singleValueMultiValueFieldMismatch.addMismatchReason(String.format( - "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.", - columnName, avroColumnSchema.getName(), getInputSchemaType())); + _singleValueMultiValueFieldMismatch.addMismatchReason(String + .format( + "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.", + columnName, avroColumnName, getInputSchemaType())); } + // check data type mismatch + FieldSpec.DataType dataTypeForMVColumn = AvroUtils.extractFieldDataType(avroColumnField); + if (fieldSpec.getDataType() != dataTypeForMVColumn) { + _dataTypeMismatch.addMismatchReason(String + .format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.", + columnName, fieldSpec.getDataType().name(), avroColumnName, dataTypeForMVColumn.name(), + getInputSchemaType())); + } + // check multi-value column structure mismatch if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) { // multi-value column should use array structure for now. _multiValueStructureMismatch.addMismatchReason(String.format( "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is of '%s' type, which should have been of 'array' type.", - columnName, avroColumnSchema.getName(), getInputSchemaType(), avroColumnType.getName())); + columnName, avroColumnName, getInputSchemaType(), avroColumnType.getName())); + } else { + org.apache.avro.Schema.Type elementType = avroColumnSchema.getElementType().getType(); + if (elementType.ordinal() < org.apache.avro.Schema.Type.STRING.ordinal()) { + // even though the column schema is of array type, the element type of that array could be of complex type like array, map, etc. + _multiValueStructureMismatch.addMismatchReason(String.format( + "The Pinot column: %s is 'multi-value' column and it's of 'array' type in input %s schema, but the element type is of '%s' type, which should have been of 'primitive' type.", + columnName, getInputSchemaType(), avroColumnSchema.getElementType().getType())); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org