This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-counter-for-detecting-schema-mismatch in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b87c6cf725f05eab3af27ceb8aad101da80c1f2d Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Sun Aug 16 12:48:07 2020 -0700 Add Hadoop counter for detecting schema mismatch --- .../impl/SegmentIndexCreationDriverImpl.java | 4 ++ .../pinot/hadoop/job/HadoopSegmentCreationJob.java | 6 ++ .../hadoop/job/mappers/SegmentCreationMapper.java | 81 +++++++++++++++++++++- 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java index 3658896..3ebba6d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -127,6 +127,10 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive } } + public RecordReader getRecordReader() { + return recordReader; + } + public void init(SegmentGeneratorConfig config, RecordReader recordReader) { init(config, new RecordReaderSegmentCreationDataSource(recordReader), CompositeTransformer.getDefaultTransformer(config.getTableConfig(), config.getSchema())); diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentCreationJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentCreationJob.java index b42c92e..10857d1 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentCreationJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentCreationJob.java @@ -176,4 +176,10 @@ public class HadoopSegmentCreationJob extends SegmentCreationJob { Path segmentTarDir = new Path(new Path(_stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR); movePath(_outputDirFileSystem, segmentTarDir.toString(), _outputDir, true); } + + public enum SchemaMisMatchCounter { + DATA_TYPE_MISMATCH, + SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH, + MULTI_VALUE_FIELD_STRUCTURE_MISMATCH + } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index 4a31da8..3d1aac5 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -20,11 +20,17 @@ package org.apache.pinot.hadoop.job.mappers; import com.google.common.base.Preconditions; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.Map; import javax.annotation.Nullable; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroRecordReader; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -41,16 +47,20 @@ import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.core.segment.name.SegmentNameGenerator; import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator; +import org.apache.pinot.hadoop.job.HadoopSegmentCreationJob; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig; +import org.apache.pinot.plugin.inputformat.orc.ORCRecordReader; import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig; import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.FileFormat; +import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.JsonUtils; @@ -85,6 +95,11 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab protected File _localSegmentDir; protected File _localSegmentTarDir; + // Counter for detecting schema mismatch + private int _dataTypeMismatch; + private int _singleValueMultiValueFieldMismatch; + private int _multiValueStructureMismatch; + /** * Generate a relative output directory path when `useRelativePath` flag is on. * This method will compute the relative path based on `inputFile` and `baseInputDir`, @@ -243,7 +258,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId); _logger.info("Start creating segment with sequence id: {}", sequenceId); - SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); // Start a thread that reports progress every minute during segment generation to prevent job getting killed Thread progressReporterThread = new Thread(getProgressReporter(context)); @@ -251,6 +266,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab progressReporterThread.start(); try { driver.init(segmentGeneratorConfig); + validateSchema(segmentGeneratorConfig, driver.getRecordReader()); driver.build(); } catch (Exception e) { _logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, @@ -353,8 +369,71 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab int sequenceId) { } + public void validateSchema(SegmentGeneratorConfig segmentGeneratorConfig, RecordReader recordReader) { + if (recordReader instanceof AvroRecordReader) { + validateSchema(segmentGeneratorConfig.getInputFilePath(), FileFormat.AVRO); + } else if (recordReader instanceof ORCRecordReader) { + validateSchema(segmentGeneratorConfig.getInputFilePath(), FileFormat.ORC); + } + } + + private void validateSchema(String inputPath, FileFormat fileFormat) { + if (fileFormat == FileFormat.AVRO) { + org.apache.avro.Schema avroSchema = extractAvroSchemaFromFile(inputPath); + for (String columnName : _schema.getPhysicalColumnNames()) { + FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); + org.apache.avro.Schema.Type avroColumnType = avroSchema.getField(columnName).schema().getType(); + if (!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) { + _dataTypeMismatch++; + } + + if (fieldSpec.isSingleValueField()) { + if (avroColumnType.ordinal() < org.apache.avro.Schema.Type.STRING.ordinal()) { + // the column is a complex structure + _singleValueMultiValueFieldMismatch++; + } + } else { + if (avroColumnType.ordinal() >= org.apache.avro.Schema.Type.STRING.ordinal()) { + // the column is a complex structure + _singleValueMultiValueFieldMismatch++; + } + if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) { + // multi-value column should use array structure for now. + _multiValueStructureMismatch++; + } + } + } + } + // TODO: add validation for other formats like ORC + } + + private org.apache.avro.Schema extractAvroSchemaFromFile(String inputPath) { + try { + DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPath); + org.apache.avro.Schema avroSchema = dataStreamReader.getSchema(); + dataStreamReader.close(); + return avroSchema; + } catch (IOException e) { + throw new RuntimeException("IOException when extracting avro schema from input path: " + inputPath, e); + } + } + + private DataFileStream<GenericRecord> getAvroReader(String inputPath) + throws IOException { + try { + InputStream inputStream = new FileInputStream(inputPath); + return new DataFileStream<>(inputStream, new GenericDatumReader<>()); + } catch (FileNotFoundException e) { + throw new RuntimeException("File " + inputPath + " not found", e); + } + } + @Override public void cleanup(Context context) { + context.getCounter(HadoopSegmentCreationJob.SchemaMisMatchCounter.DATA_TYPE_MISMATCH).increment(_dataTypeMismatch); + context.getCounter(HadoopSegmentCreationJob.SchemaMisMatchCounter.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH) + .increment(_singleValueMultiValueFieldMismatch); + context.getCounter(HadoopSegmentCreationJob.SchemaMisMatchCounter.MULTI_VALUE_FIELD_STRUCTURE_MISMATCH).increment(_multiValueStructureMismatch); _logger.info("Deleting local temporary directory: {}", _localStagingDir); FileUtils.deleteQuietly(_localStagingDir); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org