This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-option-to-fail-offline-job-on-empty-record
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6b8e0a9a4b536556472e47a606d12d9149f962c8
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Mar 9 15:01:24 2021 -0800

    Add an option to fail segment creation job when getting empty files
---
 .../core/indexsegment/generator/SegmentGeneratorConfig.java      | 9 +++++++++
 .../segment/creator/impl/SegmentIndexCreationDriverImpl.java     | 3 +++
 .../apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java   | 3 +++
 3 files changed, 15 insertions(+)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 8481e5e..07595a6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -105,6 +105,7 @@ public class SegmentGeneratorConfig implements Serializable 
{
   private boolean _onHeap = false;
   private boolean _skipTimeValueCheck = false;
   private boolean _nullHandlingEnabled = false;
+  private boolean _failOnEmptyRecord = false;
 
   // constructed from FieldConfig
   private Map<String, Map<String, String>> _columnProperties = new HashMap<>();
@@ -670,4 +671,12 @@ public class SegmentGeneratorConfig implements 
Serializable {
   public void setNullHandlingEnabled(boolean nullHandlingEnabled) {
     _nullHandlingEnabled = nullHandlingEnabled;
   }
+
+  public boolean isFailOnEmptyRecord() {
+    return _failOnEmptyRecord;
+  }
+
+  public void setFailOnEmptyRecord(boolean failOnEmptyRecord) {
+    _failOnEmptyRecord = failOnEmptyRecord;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 19ec420..8bdcd5c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -146,6 +146,9 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     this.config = config;
     recordReader = dataSource.getRecordReader();
     dataSchema = config.getSchema();
+    if (config.isFailOnEmptyRecord()) {
+      Preconditions.checkState(recordReader.hasNext(), "No record in data 
source");
+    }
 
     _recordTransformer = recordTransformer;
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index a54616f..00d672f 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -255,6 +255,9 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
       segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
     }
     segmentGeneratorConfig.setOnHeap(true);
+    // Enable failing the job when meeting empty record to early detect 
potential issue from upstream.
+    // This is useful since releasing the constraint in offline job could 
allow unexpected issues appear without people's notice.
+    segmentGeneratorConfig.setFailOnEmptyRecord(true);
 
     addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, 
hdfsInputFile, sequenceId);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to