This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-counter-for-detecting-schema-mismatch
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b87c6cf725f05eab3af27ceb8aad101da80c1f2d
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Sun Aug 16 12:48:07 2020 -0700

    Add Hadoop counter for detecting schema mismatch
---
 .../impl/SegmentIndexCreationDriverImpl.java       |  4 ++
 .../pinot/hadoop/job/HadoopSegmentCreationJob.java |  6 ++
 .../hadoop/job/mappers/SegmentCreationMapper.java  | 81 +++++++++++++++++++++-
 3 files changed, 90 insertions(+), 1 deletion(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 3658896..3ebba6d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -127,6 +127,10 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     }
   }
 
+  public RecordReader getRecordReader() {
+    return recordReader;
+  }
+
   public void init(SegmentGeneratorConfig config, RecordReader recordReader) {
     init(config, new RecordReaderSegmentCreationDataSource(recordReader),
         CompositeTransformer.getDefaultTransformer(config.getTableConfig(), 
config.getSchema()));
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentCreationJob.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentCreationJob.java
index b42c92e..10857d1 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentCreationJob.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentCreationJob.java
@@ -176,4 +176,10 @@ public class HadoopSegmentCreationJob extends 
SegmentCreationJob {
     Path segmentTarDir = new Path(new Path(_stagingDir, "output"), 
JobConfigConstants.SEGMENT_TAR_DIR);
     movePath(_outputDirFileSystem, segmentTarDir.toString(), _outputDir, true);
   }
+
+  public enum SchemaMisMatchCounter {
+    DATA_TYPE_MISMATCH,
+    SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH,
+    MULTI_VALUE_FIELD_STRUCTURE_MISMATCH
+  }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 4a31da8..3d1aac5 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -20,11 +20,17 @@ package org.apache.pinot.hadoop.job.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroRecordReader;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -41,16 +47,20 @@ import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl
 import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.hadoop.job.HadoopSegmentCreationJob;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
+import org.apache.pinot.plugin.inputformat.orc.ORCRecordReader;
 import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -85,6 +95,11 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
   protected File _localSegmentDir;
   protected File _localSegmentTarDir;
 
+  // Counter for detecting schema mismatch
+  private int _dataTypeMismatch;
+  private int _singleValueMultiValueFieldMismatch;
+  private int _multiValueStructureMismatch;
+
   /**
    * Generate a relative output directory path when `useRelativePath` flag is 
on.
    * This method will compute the relative path based on `inputFile` and 
`baseInputDir`,
@@ -243,7 +258,7 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
     addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, 
hdfsInputFile, sequenceId);
 
     _logger.info("Start creating segment with sequence id: {}", sequenceId);
-    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
 
     // Start a thread that reports progress every minute during segment 
generation to prevent job getting killed
     Thread progressReporterThread = new Thread(getProgressReporter(context));
@@ -251,6 +266,7 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
     progressReporterThread.start();
     try {
       driver.init(segmentGeneratorConfig);
+      validateSchema(segmentGeneratorConfig, driver.getRecordReader());
       driver.build();
     } catch (Exception e) {
       _logger.error("Caught exception while creating segment with HDFS input 
file: {}, sequence id: {}", hdfsInputFile,
@@ -353,8 +369,71 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
       int sequenceId) {
   }
 
+  public void validateSchema(SegmentGeneratorConfig segmentGeneratorConfig, 
RecordReader recordReader) {
+    if (recordReader instanceof AvroRecordReader) {
+      validateSchema(segmentGeneratorConfig.getInputFilePath(), 
FileFormat.AVRO);
+    } else if (recordReader instanceof ORCRecordReader) {
+      validateSchema(segmentGeneratorConfig.getInputFilePath(), 
FileFormat.ORC);
+    }
+  }
+
+  private void validateSchema(String inputPath, FileFormat fileFormat) {
+    if (fileFormat == FileFormat.AVRO) {
+      org.apache.avro.Schema avroSchema = extractAvroSchemaFromFile(inputPath);
+      for (String columnName : _schema.getPhysicalColumnNames()) {
+        FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+        org.apache.avro.Schema.Type avroColumnType = 
avroSchema.getField(columnName).schema().getType();
+        if 
(!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) {
+          _dataTypeMismatch++;
+        }
+
+        if (fieldSpec.isSingleValueField()) {
+          if (avroColumnType.ordinal() < 
org.apache.avro.Schema.Type.STRING.ordinal()) {
+            // the column is a complex structure
+            _singleValueMultiValueFieldMismatch++;
+          }
+        } else {
+          if (avroColumnType.ordinal() >= 
org.apache.avro.Schema.Type.STRING.ordinal()) {
+            // the column is a complex structure
+            _singleValueMultiValueFieldMismatch++;
+          }
+          if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) {
+            // multi-value column should use array structure for now.
+            _multiValueStructureMismatch++;
+          }
+        }
+      }
+    }
+    // TODO: add validation for other formats like ORC
+  }
+
+  private org.apache.avro.Schema extractAvroSchemaFromFile(String inputPath) {
+    try {
+      DataFileStream<GenericRecord> dataStreamReader = 
getAvroReader(inputPath);
+      org.apache.avro.Schema avroSchema = dataStreamReader.getSchema();
+      dataStreamReader.close();
+      return avroSchema;
+    } catch (IOException e) {
+      throw new RuntimeException("IOException when extracting avro schema from 
input path: " + inputPath, e);
+    }
+  }
+
+  private DataFileStream<GenericRecord> getAvroReader(String inputPath)
+      throws IOException {
+    try {
+      InputStream inputStream = new FileInputStream(inputPath);
+      return new DataFileStream<>(inputStream, new GenericDatumReader<>());
+    } catch (FileNotFoundException e) {
+      throw new RuntimeException("File " + inputPath + " not found", e);
+    }
+  }
+
   @Override
   public void cleanup(Context context) {
+    
context.getCounter(HadoopSegmentCreationJob.SchemaMisMatchCounter.DATA_TYPE_MISMATCH).increment(_dataTypeMismatch);
+    
context.getCounter(HadoopSegmentCreationJob.SchemaMisMatchCounter.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH)
+        .increment(_singleValueMultiValueFieldMismatch);
+    
context.getCounter(HadoopSegmentCreationJob.SchemaMisMatchCounter.MULTI_VALUE_FIELD_STRUCTURE_MISMATCH).increment(_multiValueStructureMismatch);
     _logger.info("Deleting local temporary directory: {}", _localStagingDir);
     FileUtils.deleteQuietly(_localStagingDir);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to